Skip to main content

vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{Arc, Mutex},
4};
5
6use futures::{Future, FutureExt, future};
7use snafu::Snafu;
8use stream_cancel::Trigger;
9use tokio::{
10    sync::{mpsc, watch},
11    time::{Duration, Instant, interval, sleep_until},
12};
13use tracing::Instrument;
14use vector_lib::{
15    buffers::topology::channel::BufferSender,
16    shutdown::ShutdownSignal,
17    tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
18    trigger::DisabledTrigger,
19};
20
21use super::{
22    BuiltBuffer, TaskHandle,
23    builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
24    fanout::{ControlChannel, ControlMessage},
25    handle_errors, retain, take_healthchecks,
26    task::{Task, TaskOutput},
27};
28use crate::{
29    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
30    event::EventArray,
31    extra_context::ExtraContext,
32    shutdown::SourceShutdownCoordinator,
33    signal::ShutdownError,
34    spawn_named,
35    utilization::UtilizationRegistry,
36};
37
38pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
39
40#[derive(Debug, Snafu)]
41pub enum ReloadError {
42    #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
43    GlobalOptionsChanged { changed_fields: Vec<String> },
44    #[snafu(display("failed to compute global diff: {}", source))]
45    GlobalDiffFailed { source: serde_json::Error },
46    #[snafu(display("topology build failed"))]
47    TopologyBuildFailed,
48    #[snafu(display("failed to restore previous config"))]
49    FailedToRestore,
50}
51
52#[allow(dead_code)]
53pub struct RunningTopology {
54    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
55    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
56    outputs: HashMap<OutputId, ControlChannel>,
57    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
58    component_type_names: HashMap<ComponentKey, String>,
59    source_tasks: HashMap<ComponentKey, TaskHandle>,
60    tasks: HashMap<ComponentKey, TaskHandle>,
61    shutdown_coordinator: SourceShutdownCoordinator,
62    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
63    pub(crate) config: Config,
64    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
65    watch: (WatchTx, WatchRx),
66    graceful_shutdown_duration: Option<Duration>,
67    utilization_registry: Option<UtilizationRegistry>,
68    utilization_task: Option<TaskHandle>,
69    utilization_task_shutdown_trigger: Option<Trigger>,
70    metrics_task: Option<TaskHandle>,
71    metrics_task_shutdown_trigger: Option<Trigger>,
72    pending_reload: Option<HashSet<ComponentKey>>,
73}
74
75impl RunningTopology {
76    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
77        Self {
78            inputs: HashMap::new(),
79            inputs_tap_metadata: HashMap::new(),
80            outputs: HashMap::new(),
81            outputs_tap_metadata: HashMap::new(),
82            component_type_names: HashMap::new(),
83            shutdown_coordinator: SourceShutdownCoordinator::default(),
84            detach_triggers: HashMap::new(),
85            source_tasks: HashMap::new(),
86            tasks: HashMap::new(),
87            abort_tx,
88            watch: watch::channel(TapResource::default()),
89            graceful_shutdown_duration: config.graceful_shutdown_duration,
90            config,
91            utilization_registry: None,
92            utilization_task: None,
93            utilization_task_shutdown_trigger: None,
94            metrics_task: None,
95            metrics_task_shutdown_trigger: None,
96            pending_reload: None,
97        }
98    }
99
100    /// Gets the configuration that represents this running topology.
101    pub const fn config(&self) -> &Config {
102        &self.config
103    }
104
105    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
106    /// initializes the pending reload set.
107    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
108        match &mut self.pending_reload {
109            None => self.pending_reload = Some(new_set.clone()),
110            Some(existing) => existing.extend(new_set),
111        }
112    }
113
114    /// Creates a subscription to topology changes.
115    ///
116    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
117    pub fn watch(&self) -> watch::Receiver<TapResource> {
118        self.watch.1.clone()
119    }
120
121    /// Signal that all sources in this topology are ended.
122    ///
123    /// The future returned by this function will finish once all the sources in
124    /// this topology have finished. This allows the caller to wait for or
125    /// detect that the sources in the topology are no longer
126    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
127    /// shutdown signal.
128    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
129        self.shutdown_coordinator.shutdown_tripwire()
130    }
131
132    /// Shut down all topology components.
133    ///
134    /// This function sends the shutdown signal to all sources in this topology
135    /// and returns a future that resolves once all components (sources,
136    /// transforms, and sinks) have finished shutting down. Transforms and sinks
137    /// will shut down automatically once their input tasks finish.
138    ///
139    /// This function takes ownership of `self`, so once it returns everything
140    /// in the [`RunningTopology`] instance has been dropped except for the
141    /// `tasks` map. This map gets moved into the returned future and is used to
142    /// poll for when the tasks have completed. Once the returned future is
143    /// dropped then everything from this RunningTopology instance is fully
144    /// dropped.
145    pub fn stop(self) -> impl Future<Output = ()> {
146        // Create handy handles collections of all tasks for the subsequent
147        // operations.
148        let mut wait_handles = Vec::new();
149        // We need a Vec here since source components have two tasks. One for
150        // pump in self.tasks, and the other for source in self.source_tasks.
151        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
152
153        let map_closure = |_result| ();
154
155        // We need to give some time to the sources to gracefully shutdown, so
156        // we will merge them with other tasks.
157        for (key, task) in self.tasks.into_iter().chain(self.source_tasks) {
158            let task = task.map(map_closure).shared();
159
160            wait_handles.push(task.clone());
161            check_handles.entry(key).or_default().push(task);
162        }
163
164        if let Some(utilization_task) = self.utilization_task {
165            wait_handles.push(utilization_task.map(map_closure).shared());
166        }
167
168        if let Some(metrics_task) = self.metrics_task {
169            wait_handles.push(metrics_task.map(map_closure).shared());
170        }
171
172        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
173        let deadline = self
174            .graceful_shutdown_duration
175            .map(|grace_period| Instant::now() + grace_period);
176
177        let timeout = if let Some(deadline) = deadline {
178            // If we reach the deadline, this future will print out which components
179            // won't gracefully shutdown since we will start to forcefully shutdown
180            // the sources.
181            let mut check_handles2 = check_handles.clone();
182            Box::pin(async move {
183                sleep_until(deadline).await;
184                // Remove all tasks that have shutdown.
185                check_handles2.retain(|_key, handles| {
186                    retain(handles, |handle| handle.peek().is_none());
187                    !handles.is_empty()
188                });
189                let remaining_components = check_handles2
190                    .keys()
191                    .map(|item| item.to_string())
192                    .collect::<Vec<_>>()
193                    .join(", ");
194
195                error!(
196                    components = ?remaining_components,
197                    message = "Failed to gracefully shut down in time. Killing components.",
198                    internal_log_rate_limit = false
199                );
200            }) as future::BoxFuture<'static, ()>
201        } else {
202            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
203        };
204
205        // Reports in intervals which components are still running.
206        let mut interval = interval(Duration::from_secs(5));
207        let reporter = async move {
208            loop {
209                interval.tick().await;
210
211                // Remove all tasks that have shutdown.
212                check_handles.retain(|_key, handles| {
213                    retain(handles, |handle| handle.peek().is_none());
214                    !handles.is_empty()
215                });
216                let remaining_components = check_handles
217                    .keys()
218                    .map(|item| item.to_string())
219                    .collect::<Vec<_>>()
220                    .join(", ");
221
222                let (deadline_passed, time_remaining) = match deadline {
223                    Some(d) => match d.checked_duration_since(Instant::now()) {
224                        Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
225                        None => (true, "overdue".to_string()),
226                    },
227                    None => (false, "no time limit".to_string()),
228                };
229
230                info!(
231                    remaining_components = ?remaining_components,
232                    time_remaining = ?time_remaining,
233                    "Shutting down... Waiting on running components."
234                );
235
236                let all_done = check_handles.is_empty();
237
238                if all_done {
239                    info!("Shutdown reporter exiting: all components shut down.");
240                    break;
241                } else if deadline_passed {
242                    error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
243                    break;
244                }
245            }
246        };
247
248        // Finishes once all tasks have shutdown.
249        let success = futures::future::join_all(wait_handles).map(|_| ());
250
251        // Aggregate future that ends once anything detects that all tasks have shutdown.
252        let shutdown_complete_future = future::select_all(vec![
253            Box::pin(timeout) as future::BoxFuture<'static, ()>,
254            Box::pin(reporter) as future::BoxFuture<'static, ()>,
255            Box::pin(success) as future::BoxFuture<'static, ()>,
256        ]);
257
258        // Now kick off the shutdown process by shutting down the sources.
259        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
260        if let Some(trigger) = self.utilization_task_shutdown_trigger {
261            trigger.cancel();
262        }
263        if let Some(trigger) = self.metrics_task_shutdown_trigger {
264            trigger.cancel();
265        }
266
267        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
268    }
269
270    /// Attempts to load a new configuration and update this running topology.
271    ///
272    /// If the new configuration was valid, and all changes were able to be made -- removing of
273    /// old components, changing of existing components, adding of new components -- then
274    /// `Ok(())` is returned.
275    ///
276    /// If the new configuration is not valid, or not all of the changes in the new configuration
277    /// were able to be made, then this method will attempt to undo the changes made and bring the
278    /// topology back to its previous state, returning the appropriate error.
279    ///
280    /// If the restore also fails, `ReloadError::FailedToRestore` is returned.
281    pub async fn reload_config_and_respawn(
282        &mut self,
283        new_config: Config,
284        extra_context: ExtraContext,
285    ) -> Result<(), ReloadError> {
286        info!("Reloading running topology with new configuration.");
287
288        if self.config.global != new_config.global {
289            return match self.config.global.diff(&new_config.global) {
290                Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
291                Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
292            };
293        }
294
295        // Calculate the change between the current configuration and the new configuration, and
296        // shutdown any components that are changing so that we can reclaim their buffers before
297        // spawning the new version of the component.
298        //
299        // We also shutdown any component that is simply being removed entirely.
300        let diff = if let Some(components) = &self.pending_reload {
301            ConfigDiff::new(&self.config, &new_config, components.clone())
302        } else {
303            ConfigDiff::new(&self.config, &new_config, HashSet::new())
304        };
305        let buffers = self.shutdown_diff(&diff, &new_config).await;
306
307        // Gives windows some time to make available any port
308        // released by shutdown components.
309        // Issue: https://github.com/vectordotdev/vector/issues/3035
310        if cfg!(windows) {
311            // This value is guess work.
312            tokio::time::sleep(Duration::from_millis(200)).await;
313        }
314
315        // Try to build all of the new components coming from the new configuration.  If we can
316        // successfully build them, we'll attempt to connect them up to the topology and spawn their
317        // respective component tasks.
318        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
319            .with_buffers(buffers.clone())
320            .with_extra_context(extra_context.clone())
321            .with_utilization_registry(self.utilization_registry.clone())
322            .build_or_log_errors()
323            .await
324        {
325            // If healthchecks are configured for any of the changing/new components, try running
326            // them before moving forward with connecting and spawning.  In some cases, healthchecks
327            // failing may be configured as a non-blocking issue and so we'll still continue on.
328            if self
329                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
330                .await
331            {
332                self.connect_diff(&diff, &mut new_pieces).await;
333                self.spawn_diff(&diff, new_pieces);
334                self.config = new_config;
335
336                info!("New configuration loaded successfully.");
337
338                return Ok(());
339            }
340        }
341
342        // We failed to build, connect, and spawn all of the changed/new components, so we flip
343        // around the configuration differential to generate all the components that we need to
344        // bring back to restore the current configuration.
345        warn!("Failed to completely load new configuration. Restoring old configuration.");
346
347        let diff = diff.flip();
348        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
349            .with_buffers(buffers)
350            .with_extra_context(extra_context.clone())
351            .with_utilization_registry(self.utilization_registry.clone())
352            .build_or_log_errors()
353            .await
354            && self
355                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
356                .await
357        {
358            self.connect_diff(&diff, &mut new_pieces).await;
359            self.spawn_diff(&diff, new_pieces);
360
361            info!("Old configuration restored successfully.");
362
363            return Err(ReloadError::TopologyBuildFailed);
364        }
365
366        error!(
367            message = "Failed to restore old configuration.",
368            internal_log_rate_limit = false
369        );
370
371        Err(ReloadError::FailedToRestore)
372    }
373
374    /// Attempts to reload enrichment tables.
375    pub(crate) async fn reload_enrichment_tables(&self) {
376        reload_enrichment_tables(&self.config).await;
377    }
378
379    pub(crate) async fn run_healthchecks(
380        &mut self,
381        diff: &ConfigDiff,
382        pieces: &mut TopologyPieces,
383        options: HealthcheckOptions,
384    ) -> bool {
385        if options.enabled {
386            let healthchecks = take_healthchecks(diff, pieces)
387                .into_iter()
388                .map(|(_, task)| task);
389            let healthchecks = future::try_join_all(healthchecks);
390
391            info!("Running healthchecks.");
392            if options.require_healthy {
393                let success = healthchecks.await;
394
395                if success.is_ok() {
396                    info!("All healthchecks passed.");
397                    true
398                } else {
399                    error!(
400                        message = "Sinks unhealthy.",
401                        internal_log_rate_limit = false
402                    );
403                    false
404                }
405            } else {
406                tokio::spawn(healthchecks);
407                true
408            }
409        } else {
410            true
411        }
412    }
413
414    /// Shuts down any changed/removed component in the given configuration diff.
415    ///
416    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
417    async fn shutdown_diff(
418        &mut self,
419        diff: &ConfigDiff,
420        new_config: &Config,
421    ) -> HashMap<ComponentKey, BuiltBuffer> {
422        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
423        // components to terminate naturally by virtue of the flow of events stopping.
424        if diff.sources.any_changed_or_removed()
425            || diff.enrichment_tables.sources.any_changed_or_removed()
426        {
427            let timeout = Duration::from_secs(30);
428            let mut source_shutdown_handles = Vec::new();
429
430            let deadline = Instant::now() + timeout;
431            for key in diff
432                .sources
433                .to_remove
434                .iter()
435                .chain(diff.enrichment_tables.sources.to_remove.iter())
436            {
437                debug!(component_id = %key, "Removing source.");
438
439                let previous = self.tasks.remove(key).unwrap();
440                drop(previous); // detach and forget
441
442                self.remove_outputs(key);
443                source_shutdown_handles
444                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
445            }
446
447            for key in diff
448                .sources
449                .to_change
450                .iter()
451                .chain(diff.enrichment_tables.sources.to_change.iter())
452            {
453                debug!(component_id = %key, "Changing source.");
454
455                self.remove_outputs(key);
456                source_shutdown_handles
457                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
458            }
459
460            debug!(
461                "Waiting for up to {} seconds for source(s) to finish shutting down.",
462                timeout.as_secs()
463            );
464            futures::future::join_all(source_shutdown_handles).await;
465
466            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
467            for key in diff.sources.removed_and_changed() {
468                if let Some(task) = self.source_tasks.remove(key) {
469                    task.await.unwrap().unwrap();
470                }
471            }
472        }
473
474        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
475        // downstream components to terminate naturally by virtue of the flow of events stopping.
476        //
477        // Since transforms are entirely driven by the flow of events into them from upstream
478        // components, the shutdown of sources they depend on, or the shutdown of transforms they
479        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
480        // which is why we don't do any manual triggering of shutdown here.
481        for key in &diff.transforms.to_remove {
482            debug!(component_id = %key, "Removing transform.");
483
484            let previous = self.tasks.remove(key).unwrap();
485            drop(previous); // detach and forget
486
487            self.remove_inputs(key, diff, new_config).await;
488            self.remove_outputs(key);
489
490            if let Some(registry) = self.utilization_registry.as_ref() {
491                registry.remove_component(key);
492            }
493        }
494
495        for key in &diff.transforms.to_change {
496            debug!(component_id = %key, "Changing transform.");
497
498            self.remove_inputs(key, diff, new_config).await;
499            self.remove_outputs(key);
500        }
501
502        // Now we'll process any changed/removed sinks.
503        //
504        // At this point both the old and the new config don't have conflicts in their resource
505        // usage. So if we combine their resources, all found conflicts are between to be removed
506        // and to be added components.
507        let removed_table_sinks = diff
508            .enrichment_tables
509            .sinks
510            .removed_and_changed()
511            .map(|key| {
512                (
513                    key.clone(),
514                    enrichment_table_sink_resources(&self.config, key),
515                )
516            })
517            .collect::<Vec<_>>();
518        let remove_sink = diff
519            .sinks
520            .removed_and_changed()
521            .map(|key| {
522                (
523                    key,
524                    self.config
525                        .sink(key)
526                        .map(|s| s.resources(key))
527                        .unwrap_or_default(),
528                )
529            })
530            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
531        let add_source = diff
532            .sources
533            .changed_and_added()
534            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
535        let added_table_sinks = diff
536            .enrichment_tables
537            .sinks
538            .changed_and_added()
539            .map(|key| {
540                (
541                    key.clone(),
542                    enrichment_table_sink_resources(new_config, key),
543                )
544            })
545            .collect::<Vec<_>>();
546        let add_sink = diff
547            .sinks
548            .changed_and_added()
549            .map(|key| {
550                (
551                    key,
552                    new_config
553                        .sink(key)
554                        .map(|s| s.resources(key))
555                        .unwrap_or_default(),
556                )
557            })
558            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
559        let conflicts = Resource::conflicts(
560            remove_sink.map(|(key, value)| ((true, key), value)).chain(
561                add_sink
562                    .chain(add_source)
563                    .map(|(key, value)| ((false, key), value)),
564            ),
565        )
566        .into_values()
567        .flatten()
568        .collect::<HashSet<_>>();
569        // Existing conflicting sinks
570        let conflicting_sinks = conflicts
571            .into_iter()
572            .filter(|&(existing_sink, _)| existing_sink)
573            .map(|(_, key)| key.clone());
574
575        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
576        let reuse_buffers = diff
577            .sinks
578            .to_change
579            .iter()
580            .chain(diff.enrichment_tables.sinks.to_change.iter())
581            .filter(|&key| {
582                if diff.components_to_reload.contains(key) {
583                    return false;
584                }
585                self.config
586                    .sink(key)
587                    .map(|s| s.buffer.clone())
588                    .or_else(|| enrichment_table_sink_buffer(&self.config, key))
589                    == new_config
590                        .sink(key)
591                        .map(|s| s.buffer.clone())
592                        .or_else(|| enrichment_table_sink_buffer(new_config, key))
593            })
594            .cloned()
595            .collect::<HashSet<_>>();
596
597        // For any existing sink that has a conflicting resource dependency with a changed/added
598        // sink, for any sink that we want to reuse their buffer, or for any changed sink with
599        // a disk buffer that is not being reused, we need to explicitly wait for them to finish
600        // processing so we can reclaim ownership of those resources/buffers.
601        let changed_disk_buffer_sinks = diff
602            .sinks
603            .to_change
604            .iter()
605            .filter(|key| {
606                !reuse_buffers.contains(*key)
607                    && (self
608                        .config
609                        .sink(key)
610                        .is_some_and(|s| s.buffer.has_disk_stage())
611                        || enrichment_table_sink_buffer(&self.config, key)
612                            .is_some_and(|buffer| buffer.has_disk_stage()))
613            })
614            .cloned()
615            .collect::<HashSet<_>>();
616
617        let wait_for_sinks = conflicting_sinks
618            .chain(reuse_buffers.iter().cloned())
619            .chain(changed_disk_buffer_sinks.iter().cloned())
620            .collect::<HashSet<_>>();
621
622        // First, we remove any inputs to removed sinks so they can naturally shut down.
623        let removed_sinks = diff
624            .sinks
625            .to_remove
626            .iter()
627            .chain(diff.enrichment_tables.sinks.to_remove.iter())
628            .collect::<Vec<_>>();
629        for key in &removed_sinks {
630            debug!(component_id = %key, "Removing sink.");
631            self.remove_inputs(key, diff, new_config).await;
632
633            if let Some(registry) = self.utilization_registry.as_ref() {
634                registry.remove_component(key);
635            }
636        }
637
638        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
639        // they can naturally shutdown and allow us to recover their buffers if possible.
640        let mut buffer_tx = HashMap::new();
641
642        let sinks_to_change = diff
643            .sinks
644            .to_change
645            .iter()
646            .chain(diff.enrichment_tables.sinks.to_change.iter())
647            .collect::<Vec<_>>();
648
649        for key in &sinks_to_change {
650            debug!(component_id = %key, "Changing sink.");
651            if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) {
652                self.detach_triggers
653                    .remove(key)
654                    .unwrap()
655                    .into_inner()
656                    .cancel();
657
658                if reuse_buffers.contains(key) {
659                    // We explicitly clone the input side of the buffer and store it so we don't lose
660                    // it when we remove the inputs below.
661                    //
662                    // We clone instead of removing here because otherwise the input will be missing for
663                    // the rest of the reload process, which violates the assumption that all previous
664                    // inputs for components not being removed are still available. It's simpler to
665                    // allow the "old" input to stick around and be replaced (even though that's
666                    // basically a no-op since we're reusing the same buffer) than it is to pass around
667                    // info about which sinks are having their buffers reused and treat them differently
668                    // at other stages.
669                    buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
670                }
671            }
672            self.remove_inputs(key, diff, new_config).await;
673        }
674
675        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
676        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
677        // marked for reuse.
678        //
679        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
680        // on, we don't bother waiting for it to shutdown.
681        for key in &removed_sinks {
682            let previous = self.tasks.remove(key).unwrap();
683            if wait_for_sinks.contains(key) {
684                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
685                previous.await.unwrap().unwrap();
686            } else {
687                drop(previous); // detach and forget
688            }
689        }
690
691        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
692        for key in &sinks_to_change {
693            if wait_for_sinks.contains(key) {
694                let previous = self.tasks.remove(key).unwrap();
695                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
696                let buffer = previous.await.unwrap().unwrap();
697
698                if reuse_buffers.contains(key) {
699                    // We clone instead of removing here because otherwise the input will be
700                    // missing for the rest of the reload process, which violates the assumption
701                    // that all previous inputs for components not being removed are still
702                    // available. It's simpler to allow the "old" input to stick around and be
703                    // replaced (even though that's basically a no-op since we're reusing the same
704                    // buffer) than it is to pass around info about which sinks are having their
705                    // buffers reused and treat them differently at other stages.
706                    let tx = buffer_tx.remove(key).unwrap();
707                    let rx = match buffer {
708                        TaskOutput::Sink(rx) => rx.into_inner(),
709                        _ => unreachable!(),
710                    };
711
712                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
713                }
714            }
715        }
716
717        buffers
718    }
719
720    /// Connects all changed/added components in the given configuration diff.
721    pub(crate) async fn connect_diff(
722        &mut self,
723        diff: &ConfigDiff,
724        new_pieces: &mut TopologyPieces,
725    ) {
726        debug!("Connecting changed/added component(s).");
727
728        // Update tap metadata
729        if !self.watch.0.is_closed() {
730            for key in &diff.sources.to_remove {
731                // Sources only have outputs
732                self.outputs_tap_metadata.remove(key);
733                self.component_type_names.remove(key);
734            }
735
736            for key in &diff.transforms.to_remove {
737                // Transforms can have both inputs and outputs
738                self.outputs_tap_metadata.remove(key);
739                self.inputs_tap_metadata.remove(key);
740                self.component_type_names.remove(key);
741            }
742
743            for key in &diff.sinks.to_remove {
744                // Sinks only have inputs
745                self.inputs_tap_metadata.remove(key);
746                self.component_type_names.remove(key);
747            }
748
749            for key in &diff.enrichment_tables.sinks.to_remove {
750                // Sinks only have inputs
751                self.inputs_tap_metadata.remove(key);
752                self.component_type_names.remove(key);
753            }
754
755            for key in &diff.enrichment_tables.sources.to_remove {
756                // Sources only have outputs
757                self.outputs_tap_metadata.remove(key);
758                self.component_type_names.remove(key);
759            }
760
761            for key in diff.sources.changed_and_added() {
762                if let Some(task) = new_pieces.tasks.get(key) {
763                    let typetag = task.typetag().to_string();
764                    self.outputs_tap_metadata
765                        .insert(key.clone(), ("source", typetag.clone()));
766                    self.component_type_names.insert(key.clone(), typetag);
767                }
768            }
769
770            for key in diff.enrichment_tables.sources.changed_and_added() {
771                if let Some(task) = new_pieces.tasks.get(key) {
772                    self.outputs_tap_metadata
773                        .insert(key.clone(), ("source", task.typetag().to_string()));
774                    self.component_type_names
775                        .insert(key.clone(), task.typetag().to_string());
776                }
777            }
778
779            for key in diff.transforms.changed_and_added() {
780                if let Some(task) = new_pieces.tasks.get(key) {
781                    let typetag = task.typetag().to_string();
782                    self.outputs_tap_metadata
783                        .insert(key.clone(), ("transform", typetag.clone()));
784                    self.component_type_names.insert(key.clone(), typetag);
785                }
786            }
787
788            for key in diff.sinks.changed_and_added() {
789                if let Some(task) = new_pieces.tasks.get(key) {
790                    self.component_type_names
791                        .insert(key.clone(), task.typetag().to_string());
792                }
793            }
794
795            for key in diff.enrichment_tables.sinks.changed_and_added() {
796                if let Some(task) = new_pieces.tasks.get(key) {
797                    self.component_type_names
798                        .insert(key.clone(), task.typetag().to_string());
799                }
800            }
801
802            for (key, input) in &new_pieces.inputs {
803                self.inputs_tap_metadata
804                    .insert(key.clone(), input.1.clone());
805            }
806        }
807
808        // We configure the outputs of any changed/added sources first, so they're available to any
809        // transforms and sinks that come afterwards.
810        for key in diff.sources.changed_and_added() {
811            debug!(component_id = %key, "Configuring outputs for source.");
812            self.setup_outputs(key, new_pieces).await;
813        }
814
815        let added_changed_table_sources: Vec<ComponentKey> = diff
816            .enrichment_tables
817            .sources
818            .changed_and_added()
819            .cloned()
820            .collect();
821        for key in &added_changed_table_sources {
822            debug!(component_id = %key, "Connecting outputs for enrichment table source.");
823            self.setup_outputs(key, new_pieces).await;
824        }
825
826        // We configure the outputs of any changed/added transforms next, for the same reason: we
827        // need them to be available to any transforms and sinks that come afterwards.
828        for key in diff.transforms.changed_and_added() {
829            debug!(component_id = %key, "Configuring outputs for transform.");
830            self.setup_outputs(key, new_pieces).await;
831        }
832
833        // Now that all possible outputs are configured, we can start wiring up inputs, starting
834        // with transforms.
835        for key in diff.transforms.changed_and_added() {
836            debug!(component_id = %key, "Connecting inputs for transform.");
837            self.setup_inputs(key, diff, new_pieces).await;
838        }
839
840        // Now that all sources and transforms are fully configured, we can wire up sinks.
841        for key in diff.sinks.changed_and_added() {
842            debug!(component_id = %key, "Connecting inputs for sink.");
843            self.setup_inputs(key, diff, new_pieces).await;
844        }
845        let added_changed_tables: Vec<ComponentKey> = diff
846            .enrichment_tables
847            .sinks
848            .changed_and_added()
849            .cloned()
850            .collect();
851        for key in &added_changed_tables {
852            debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
853            self.setup_inputs(key, diff, new_pieces).await;
854        }
855
856        // We do a final pass here to reconnect unchanged components.
857        //
858        // Why would we reconnect unchanged components?  Well, as sources and transforms will
859        // recreate their fanouts every time they're changed, we can run into a situation where a
860        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
861        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
862        //
863        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
864        // connect components backwards i.e. transforms to sources/transforms, and sinks to
865        // sources/transforms, to ensure we're connecting components in order.
866        self.reattach_severed_inputs(diff);
867
868        // Broadcast any topology changes to subscribers.
869        if !self.watch.0.is_closed() {
870            let outputs = self
871                .outputs
872                .clone()
873                .into_iter()
874                .flat_map(|(output_id, control_tx)| {
875                    self.outputs_tap_metadata.get(&output_id.component).map(
876                        |(component_kind, component_type)| {
877                            (
878                                TapOutput {
879                                    output_id,
880                                    component_kind,
881                                    component_type: component_type.clone(),
882                                },
883                                control_tx,
884                            )
885                        },
886                    )
887                })
888                .collect::<HashMap<_, _>>();
889
890            let mut removals = diff.sources.to_remove.clone();
891            removals.extend(diff.transforms.to_remove.iter().cloned());
892            self.watch
893                .0
894                .send(TapResource {
895                    outputs,
896                    inputs: self.inputs_tap_metadata.clone(),
897                    source_keys: diff
898                        .sources
899                        .changed_and_added()
900                        .map(|key| key.to_string())
901                        .chain(
902                            added_changed_table_sources
903                                .iter()
904                                .map(|key| key.to_string()),
905                        )
906                        .collect(),
907                    sink_keys: diff
908                        .sinks
909                        .changed_and_added()
910                        .map(|key| key.to_string())
911                        .chain(added_changed_tables.iter().map(|key| key.to_string()))
912                        .collect(),
913                    // Note, only sources and transforms are relevant. Sinks do
914                    // not have outputs to tap.
915                    removals,
916                    type_names: self
917                        .component_type_names
918                        .iter()
919                        .map(|(k, v)| (k.to_string(), v.clone()))
920                        .collect(),
921                })
922                .expect("Couldn't broadcast config changes.");
923        }
924    }
925
926    async fn setup_outputs(
927        &mut self,
928        key: &ComponentKey,
929        new_pieces: &mut builder::TopologyPieces,
930    ) {
931        let outputs = new_pieces.outputs.remove(key).unwrap();
932        for (port, output) in outputs {
933            debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
934
935            let id = OutputId {
936                component: key.clone(),
937                port,
938            };
939
940            self.outputs.insert(id, output);
941        }
942    }
943
944    async fn setup_inputs(
945        &mut self,
946        key: &ComponentKey,
947        diff: &ConfigDiff,
948        new_pieces: &mut builder::TopologyPieces,
949    ) {
950        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
951
952        let old_inputs = self
953            .config
954            .inputs_for_node(key)
955            .into_iter()
956            .flatten()
957            .cloned()
958            .collect::<HashSet<_>>();
959
960        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
961        let inputs_to_add = &new_inputs - &old_inputs;
962
963        for input in inputs {
964            let output = self.outputs.get_mut(&input).expect("unknown output");
965
966            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
967                // If the input we're connecting to is changing, that means its outputs will have been
968                // recreated, so instead of replacing a paused sink, we have to add it to this new
969                // output for the first time, since there's nothing to actually replace at this point.
970                debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
971
972                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
973            } else {
974                // We know that if this component is connected to a given input, and neither
975                // components were changed, then the output must still exist, which means we paused
976                // this component's connection to its output, so we have to replace that connection
977                // now:
978                debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
979
980                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
981            }
982        }
983
984        self.inputs.insert(key.clone(), tx);
985        new_pieces
986            .detach_triggers
987            .remove(key)
988            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
989    }
990
991    fn remove_outputs(&mut self, key: &ComponentKey) {
992        self.outputs.retain(|id, _output| &id.component != key);
993    }
994
995    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
996        self.inputs.remove(key);
997        self.detach_triggers.remove(key);
998
999        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
1000        let new_inputs = new_config
1001            .inputs_for_node(key)
1002            .unwrap_or_default()
1003            .iter()
1004            .collect::<HashSet<_>>();
1005
1006        for input in old_inputs {
1007            if let Some(output) = self.outputs.get_mut(input) {
1008                if diff.contains(&input.component)
1009                    || diff.is_removed(key)
1010                    || !new_inputs.contains(input)
1011                {
1012                    // 3 cases to remove the input:
1013                    //
1014                    // Case 1: If the input we're removing ourselves from is changing, that means its
1015                    // outputs will be recreated, so instead of pausing the sink, we just delete it
1016                    // outright to ensure things are clean.
1017                    //
1018                    // Case 2: If this component itself is being removed, then pausing makes no sense
1019                    // because it isn't coming back.
1020                    //
1021                    // Case 3: This component is no longer connected to the input from new config.
1022                    debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
1023
1024                    _ = output.send(ControlMessage::Remove(key.clone()));
1025                } else {
1026                    // We know that if this component is connected to a given input, and it isn't being
1027                    // changed, then it will exist when we reconnect inputs, so we should pause it
1028                    // now to pause further sends through that component until we reconnect:
1029                    debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1030
1031                    _ = output.send(ControlMessage::Pause(key.clone()));
1032                }
1033            }
1034        }
1035    }
1036
1037    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1038        let unchanged_transforms = self
1039            .config
1040            .transforms()
1041            .filter(|(key, _)| !diff.transforms.contains(key));
1042        for (transform_key, transform) in unchanged_transforms {
1043            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1044            for output_id in changed_outputs {
1045                debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1046
1047                let input = self.inputs.get(transform_key).cloned().unwrap();
1048                let output = self.outputs.get_mut(&output_id).unwrap();
1049                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1050            }
1051        }
1052
1053        let unchanged_table_sinks = self
1054            .config
1055            .enrichment_tables()
1056            .filter_map(|(key, table)| table.as_sink(key))
1057            .filter(|(key, _)| !diff.enrichment_tables.sinks.contains(key))
1058            .collect::<Vec<_>>();
1059        let unchanged_sinks = self
1060            .config
1061            .sinks()
1062            .filter(|(key, _)| !diff.sinks.contains(key));
1063        for (sink_key, sink) in
1064            unchanged_sinks.chain(unchanged_table_sinks.iter().map(|(k, v)| (k, v)))
1065        {
1066            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1067            for output_id in changed_outputs {
1068                debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1069
1070                let input = self.inputs.get(sink_key).cloned().unwrap();
1071                let output = self.outputs.get_mut(&output_id).unwrap();
1072                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1073            }
1074        }
1075    }
1076
1077    /// Starts any new or changed components in the given configuration diff.
1078    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1079        for key in &diff.sources.to_change {
1080            debug!(message = "Spawning changed source.", component_id = %key);
1081            self.spawn_source(key, &mut new_pieces);
1082        }
1083
1084        for key in &diff.sources.to_add {
1085            debug!(message = "Spawning new source.", component_id = %key);
1086            self.spawn_source(key, &mut new_pieces);
1087        }
1088
1089        let changed_table_sources: Vec<&ComponentKey> = diff
1090            .enrichment_tables
1091            .sources
1092            .to_change
1093            .iter()
1094            .filter(|k| new_pieces.source_tasks.contains_key(k))
1095            .collect();
1096
1097        let added_table_sources: Vec<&ComponentKey> = diff
1098            .enrichment_tables
1099            .sources
1100            .to_add
1101            .iter()
1102            .filter(|k| new_pieces.source_tasks.contains_key(k))
1103            .collect();
1104
1105        for key in changed_table_sources {
1106            debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1107            self.spawn_source(key, &mut new_pieces);
1108        }
1109
1110        for key in added_table_sources {
1111            debug!(message = "Spawning new enrichment table source.", component_id = %key);
1112            self.spawn_source(key, &mut new_pieces);
1113        }
1114
1115        for key in &diff.transforms.to_change {
1116            debug!(message = "Spawning changed transform.", component_id = %key);
1117            self.spawn_transform(key, &mut new_pieces);
1118        }
1119
1120        for key in &diff.transforms.to_add {
1121            debug!(message = "Spawning new transform.", component_id = %key);
1122            self.spawn_transform(key, &mut new_pieces);
1123        }
1124
1125        for key in &diff.sinks.to_change {
1126            debug!(message = "Spawning changed sink.", component_id = %key);
1127            self.spawn_sink(key, &mut new_pieces);
1128        }
1129
1130        for key in &diff.sinks.to_add {
1131            trace!(message = "Spawning new sink.", component_id = %key);
1132            self.spawn_sink(key, &mut new_pieces);
1133        }
1134
1135        let changed_tables: Vec<&ComponentKey> = diff
1136            .enrichment_tables
1137            .sinks
1138            .to_change
1139            .iter()
1140            .filter(|k| new_pieces.tasks.contains_key(k))
1141            .collect();
1142
1143        let added_tables: Vec<&ComponentKey> = diff
1144            .enrichment_tables
1145            .sinks
1146            .to_add
1147            .iter()
1148            .filter(|k| new_pieces.tasks.contains_key(k))
1149            .collect();
1150
1151        for key in changed_tables {
1152            debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1153            self.spawn_sink(key, &mut new_pieces);
1154        }
1155
1156        for key in added_tables {
1157            debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1158            self.spawn_sink(key, &mut new_pieces);
1159        }
1160    }
1161
1162    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1163        let task = new_pieces.tasks.remove(key).unwrap();
1164        let span = error_span!(
1165            "sink",
1166            component_kind = "sink",
1167            component_id = %task.id(),
1168            component_type = %task.typetag(),
1169        );
1170
1171        let task_span = span.or_current();
1172        #[cfg(unix)]
1173        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1174            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1175                task.id().to_string(),
1176                "sink".to_string(),
1177                task.typetag().to_string(),
1178            );
1179            debug!(
1180                component_kind = "sink",
1181                component_type = task.typetag(),
1182                component_id = task.id(),
1183                group_id = group_id.as_raw().to_string(),
1184                "Registered new allocation group."
1185            );
1186            group_id.attach_to_span(&task_span);
1187        }
1188
1189        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1190        let task = {
1191            let key = key.clone();
1192            handle_errors(task, self.abort_tx.clone(), |error| {
1193                ShutdownError::SinkAborted { key, error }
1194            })
1195        }
1196        .instrument(task_span);
1197        let spawned = spawn_named(task, task_name.as_ref());
1198        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1199            drop(previous); // detach and forget
1200        }
1201    }
1202
1203    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1204        let task = new_pieces.tasks.remove(key).unwrap();
1205        let span = error_span!(
1206            "transform",
1207            component_kind = "transform",
1208            component_id = %task.id(),
1209            component_type = %task.typetag(),
1210        );
1211
1212        let task_span = span.or_current();
1213        #[cfg(unix)]
1214        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1215            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1216                task.id().to_string(),
1217                "transform".to_string(),
1218                task.typetag().to_string(),
1219            );
1220            debug!(
1221                component_kind = "transform",
1222                component_type = task.typetag(),
1223                component_id = task.id(),
1224                group_id = group_id.as_raw().to_string(),
1225                "Registered new allocation group."
1226            );
1227            group_id.attach_to_span(&task_span);
1228        }
1229
1230        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1231        let task = {
1232            let key = key.clone();
1233            handle_errors(task, self.abort_tx.clone(), |error| {
1234                ShutdownError::TransformAborted { key, error }
1235            })
1236        }
1237        .instrument(task_span);
1238        let spawned = spawn_named(task, task_name.as_ref());
1239        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1240            drop(previous); // detach and forget
1241        }
1242    }
1243
1244    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1245        let task = new_pieces.tasks.remove(key).unwrap();
1246        let span = error_span!(
1247            "source",
1248            component_kind = "source",
1249            component_id = %task.id(),
1250            component_type = %task.typetag(),
1251        );
1252
1253        let task_span = span.or_current();
1254        #[cfg(unix)]
1255        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1256            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1257                task.id().to_string(),
1258                "source".to_string(),
1259                task.typetag().to_string(),
1260            );
1261
1262            debug!(
1263                component_kind = "source",
1264                component_type = task.typetag(),
1265                component_id = task.id(),
1266                group_id = group_id.as_raw().to_string(),
1267                "Registered new allocation group."
1268            );
1269            group_id.attach_to_span(&task_span);
1270        }
1271
1272        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1273        let task = {
1274            let key = key.clone();
1275            handle_errors(task, self.abort_tx.clone(), |error| {
1276                ShutdownError::SourceAborted { key, error }
1277            })
1278        }
1279        .instrument(task_span.clone());
1280        let spawned = spawn_named(task, task_name.as_ref());
1281        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1282            drop(previous); // detach and forget
1283        }
1284
1285        self.shutdown_coordinator
1286            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1287
1288        // Now spawn the actual source task.
1289        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1290        let source_task = {
1291            let key = key.clone();
1292            handle_errors(source_task, self.abort_tx.clone(), |error| {
1293                ShutdownError::SourceAborted { key, error }
1294            })
1295        }
1296        .instrument(task_span);
1297        self.source_tasks
1298            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1299    }
1300
1301    pub async fn start_init_validated(
1302        config: Config,
1303        extra_context: ExtraContext,
1304    ) -> Option<(Self, ShutdownErrorReceiver)> {
1305        let diff = ConfigDiff::initial(&config);
1306        let pieces = TopologyPiecesBuilder::new(&config, &diff)
1307            .with_extra_context(extra_context)
1308            .build_or_log_errors()
1309            .await?;
1310        Self::start_validated(config, diff, pieces).await
1311    }
1312
1313    pub async fn start_validated(
1314        config: Config,
1315        diff: ConfigDiff,
1316        mut pieces: TopologyPieces,
1317    ) -> Option<(Self, ShutdownErrorReceiver)> {
1318        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1319
1320        let expire_metrics = match (
1321            config.global.expire_metrics,
1322            config.global.expire_metrics_secs,
1323        ) {
1324            (Some(e), None) => {
1325                warn!(
1326                    "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1327                );
1328                if e < Duration::from_secs(0) {
1329                    None
1330                } else {
1331                    Some(e.as_secs_f64())
1332                }
1333            }
1334            (Some(_), Some(_)) => {
1335                error!(
1336                    message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1337                    internal_log_rate_limit = false
1338                );
1339                return None;
1340            }
1341            (None, Some(e)) => {
1342                if e < 0f64 {
1343                    None
1344                } else {
1345                    Some(e)
1346                }
1347            }
1348            (None, None) => Some(300f64),
1349        };
1350
1351        if let Err(error) = crate::metrics::Controller::get()
1352            .expect("Metrics must be initialized")
1353            .set_expiry(
1354                expire_metrics,
1355                config
1356                    .global
1357                    .expire_metrics_per_metric_set
1358                    .clone()
1359                    .unwrap_or_default(),
1360            )
1361        {
1362            error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1363            return None;
1364        }
1365
1366        let (utilization_emitter, utilization_registry) = pieces
1367            .utilization
1368            .take()
1369            .expect("Topology is missing the utilization metric emitter!");
1370        let metrics_storage = pieces.metrics_storage.clone();
1371        let metrics_refresh_period = config
1372            .global
1373            .metrics_storage_refresh_period
1374            .map(Duration::from_secs_f64);
1375        let mut running_topology = Self::new(config, abort_tx);
1376
1377        if !running_topology
1378            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1379            .await
1380        {
1381            return None;
1382        }
1383        running_topology.connect_diff(&diff, &mut pieces).await;
1384        running_topology.spawn_diff(&diff, pieces);
1385
1386        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1387            ShutdownSignal::new_wired();
1388        running_topology.utilization_registry = Some(utilization_registry.clone());
1389        running_topology.utilization_task_shutdown_trigger =
1390            Some(utilization_task_shutdown_trigger);
1391        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1392            "utilization_heartbeat".into(),
1393            "",
1394            async move {
1395                utilization_emitter
1396                    .run_utilization(utilization_shutdown_signal)
1397                    .await;
1398                Ok(TaskOutput::Healthcheck)
1399            },
1400        )));
1401        if let Some(metrics_refresh_period) = metrics_refresh_period {
1402            let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1403                ShutdownSignal::new_wired();
1404            running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1405            running_topology.metrics_task = Some(tokio::spawn(Task::new(
1406                "metrics_heartbeat".into(),
1407                "",
1408                async move {
1409                    metrics_storage
1410                        .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1411                        .await;
1412                    Ok(TaskOutput::Healthcheck)
1413                },
1414            )));
1415        }
1416
1417        Some((running_topology, abort_rx))
1418    }
1419}
1420
1421fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1422    let mut changed_outputs = Vec::new();
1423
1424    for source_key in diff
1425        .sources
1426        .to_change
1427        .iter()
1428        .chain(diff.enrichment_tables.sources.to_change.iter())
1429    {
1430        changed_outputs.extend(
1431            output_ids
1432                .iter()
1433                .filter(|id| &id.component == source_key)
1434                .cloned(),
1435        );
1436    }
1437
1438    for transform_key in &diff.transforms.to_change {
1439        changed_outputs.extend(
1440            output_ids
1441                .iter()
1442                .filter(|id| &id.component == transform_key)
1443                .cloned(),
1444        );
1445    }
1446
1447    changed_outputs
1448}
1449
1450fn enrichment_table_sink_resources(config: &Config, sink_key: &ComponentKey) -> Vec<Resource> {
1451    config
1452        .enrichment_tables()
1453        .filter_map(|(table_key, table)| table.as_sink(table_key))
1454        .find(|(key, _)| key == sink_key)
1455        .map(|(key, sink)| sink.resources(&key))
1456        .unwrap_or_default()
1457}
1458
1459fn enrichment_table_sink_buffer(
1460    config: &Config,
1461    sink_key: &ComponentKey,
1462) -> Option<vector_lib::buffers::BufferConfig> {
1463    config
1464        .enrichment_tables()
1465        .filter_map(|(table_key, table)| table.as_sink(table_key))
1466        .find(|(key, _)| key == sink_key)
1467        .map(|(_, sink)| sink.buffer)
1468}