Skip to main content

vector/topology/
builder.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    sync::{
6        Arc, LazyLock, Mutex,
7        atomic::{AtomicUsize, Ordering},
8    },
9    time::Instant,
10};
11
12use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
13use futures_util::stream::FuturesUnordered;
14use metrics::Counter;
15use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
16use tokio::{
17    select,
18    sync::{Mutex as AsyncMutex, mpsc::UnboundedSender, oneshot},
19    time::timeout,
20};
21use tracing::{Instrument, Span};
22use vector_lib::{
23    EstimatedJsonEncodedSizeOf,
24    buffers::{
25        BufferType, WhenFull,
26        topology::{
27            builder::TopologyBuilder,
28            channel::{
29                BufferChannelKind, BufferReceiver, BufferSender, ChannelMetricMetadata,
30                LimitedReceiver,
31            },
32        },
33    },
34    enrichment::Table,
35    internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
36    latency::LatencyRecorder,
37    schema::Definition,
38    source_sender::{DEFAULT_CHUNK_SIZE_EVENTS, SourceSenderItem},
39    transform::update_runtime_schema_definition,
40};
41use vector_lib::{gauge, internal_event::GaugeName};
42use vector_vrl_metrics::MetricsStorage;
43
44use super::{
45    BuiltBuffer, ConfigDiff,
46    fanout::{self, Fanout},
47    schema,
48    task::{Task, TaskOutput, TaskResult},
49};
50use crate::{
51    SourceSender,
52    config::{
53        ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
54        ProxyConfig, SinkContext, SinkOuter, SourceContext, SourceOuter, TransformContext,
55        TransformOuter, TransformOutput,
56    },
57    cpu_time::{CpuTimedExt, spawn_timed},
58    event::{EventArray, EventContainer},
59    extra_context::ExtraContext,
60    internal_events::EventsReceived,
61    shutdown::SourceShutdownCoordinator,
62    spawn_named,
63    topology::task::TaskError,
64    transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
65    utilization::{
66        OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
67        UtilizationRegistry,
68    },
69};
70
71static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
72    LazyLock::new(vector_lib::enrichment::TableRegistry::default);
73// `TableRegistry::load` and `finish_load` are separate operations on a process-global registry.
74// Keep topology builds and enrichment-table reloads from interleaving that transition.
75static ENRICHMENT_TABLES_LOAD_LOCK: LazyLock<AsyncMutex<()>> = LazyLock::new(AsyncMutex::default);
76static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
77
78// Resolved once at startup in `crate::app::build_runtime`, together with `chunk_size_events`. `0`
79// means startup didn't run (e.g. tests that bypass it), in which case the getters fall back to the
80// `DEFAULT_CHUNK_SIZE_EVENTS`-based values.
81static SOURCE_SENDER_BUFFER_SIZE: AtomicUsize = AtomicUsize::new(0);
82static READY_ARRAY_CAPACITY: AtomicUsize = AtomicUsize::new(0);
83
84/// How many `chunk_size_events`-sized batches the concurrent transform runner's `ReadyArrays`
85/// buffers before applying backpressure: `ready_array_capacity = chunk_size_events * this`.
86pub(crate) const READY_ARRAY_CAPACITY_CHUNKS: usize = 4;
87
88/// Returns the source sender output buffer base size.
89pub(crate) fn source_sender_buffer_size() -> usize {
90    match SOURCE_SENDER_BUFFER_SIZE.load(Ordering::Relaxed) {
91        0 => *TRANSFORM_CONCURRENCY_LIMIT * DEFAULT_CHUNK_SIZE_EVENTS,
92        size => size,
93    }
94}
95
96/// Sets the process-wide source sender buffer size. Must be called at most once, before the
97/// topology is built. Panics if called more than once.
98pub(crate) fn set_source_sender_buffer_size(size: usize) {
99    SOURCE_SENDER_BUFFER_SIZE
100        .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
101        .unwrap_or_else(|_| panic!("double source_sender_buffer_size initialization"));
102}
103
104/// Returns the `ReadyArrays` capacity used by concurrent transform runners.
105pub(crate) fn ready_array_capacity() -> NonZeroUsize {
106    let size = match READY_ARRAY_CAPACITY.load(Ordering::Relaxed) {
107        0 => DEFAULT_CHUNK_SIZE_EVENTS * READY_ARRAY_CAPACITY_CHUNKS,
108        size => size,
109    };
110    NonZeroUsize::new(size).expect("ready array capacity is non-zero")
111}
112
113/// Sets the process-wide concurrent transform runner `ReadyArrays` capacity. Must be called at most
114/// once, before the topology is built. Panics if called more than once.
115pub(crate) fn set_ready_array_capacity(size: usize) {
116    READY_ARRAY_CAPACITY
117        .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
118        .unwrap_or_else(|_| panic!("double ready_array_capacity initialization"));
119}
120
121pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
122
123static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
124    crate::app::worker_threads()
125        .map(std::num::NonZeroUsize::get)
126        .unwrap_or_else(crate::num_threads)
127});
128
129const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
130
131struct Builder<'a> {
132    config: &'a super::Config,
133    diff: &'a ConfigDiff,
134    shutdown_coordinator: SourceShutdownCoordinator,
135    errors: Vec<String>,
136    outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
137    tasks: HashMap<ComponentKey, Task>,
138    buffers: HashMap<ComponentKey, BuiltBuffer>,
139    inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
140    healthchecks: HashMap<ComponentKey, Task>,
141    detach_triggers: HashMap<ComponentKey, Trigger>,
142    extra_context: ExtraContext,
143    utilization_emitter: Option<UtilizationEmitter>,
144    utilization_registry: UtilizationRegistry,
145}
146
147impl<'a> Builder<'a> {
148    fn new(
149        config: &'a super::Config,
150        diff: &'a ConfigDiff,
151        buffers: HashMap<ComponentKey, BuiltBuffer>,
152        extra_context: ExtraContext,
153        utilization_registry: Option<UtilizationRegistry>,
154    ) -> Self {
155        // If registry is not passed, we need to build a whole new utilization emitter + registry
156        // Otherwise, we just store the registry and reuse it for this build
157        let (emitter, registry) = if let Some(registry) = utilization_registry {
158            (None, registry)
159        } else {
160            let (emitter, registry) = UtilizationEmitter::new();
161            (Some(emitter), registry)
162        };
163        Self {
164            config,
165            diff,
166            buffers,
167            shutdown_coordinator: SourceShutdownCoordinator::default(),
168            errors: vec![],
169            outputs: HashMap::new(),
170            tasks: HashMap::new(),
171            inputs: HashMap::new(),
172            healthchecks: HashMap::new(),
173            detach_triggers: HashMap::new(),
174            extra_context,
175            utilization_emitter: emitter,
176            utilization_registry: registry,
177        }
178    }
179
180    /// Builds the new pieces of the topology found in `self.diff`.
181    async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
182        let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await;
183        let enrichment_tables = self.load_enrichment_tables().await;
184        let source_tasks = self.build_sources(enrichment_tables).await;
185        self.build_transforms(enrichment_tables).await;
186        self.build_sinks(enrichment_tables).await;
187
188        if self.errors.is_empty() {
189            // We should have all the data for the enrichment tables loaded now, so switch them over to
190            // readonly.
191            enrichment_tables.finish_load();
192
193            Ok(TopologyPieces {
194                inputs: self.inputs,
195                outputs: Self::finalize_outputs(self.outputs),
196                tasks: self.tasks,
197                source_tasks,
198                healthchecks: self.healthchecks,
199                shutdown_coordinator: self.shutdown_coordinator,
200                detach_triggers: self.detach_triggers,
201                metrics_storage: METRICS_STORAGE.clone(),
202                utilization: self
203                    .utilization_emitter
204                    .map(|e| (e, self.utilization_registry)),
205            })
206        } else {
207            Err(self.errors)
208        }
209    }
210
211    fn finalize_outputs(
212        outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
213    ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
214    {
215        let mut finalized_outputs = HashMap::new();
216        for (id, output) in outputs {
217            let entry = finalized_outputs
218                .entry(id.component)
219                .or_insert_with(HashMap::new);
220            entry.insert(id.port, output);
221        }
222
223        finalized_outputs
224    }
225
226    /// Loads, or reloads the enrichment tables.
227    /// The tables are stored in the `ENRICHMENT_TABLES` global variable.
228    async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
229        let mut enrichment_tables: HashMap<String, Box<dyn Table + Send + Sync>> = HashMap::new();
230
231        // Build enrichment tables
232        'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
233            let table_name = name.to_string();
234            if ENRICHMENT_TABLES.needs_reload(&table_name)
235                || self.diff.enrichment_tables.tables.is_changed(name)
236                || self.diff.enrichment_tables.tables.is_added(name)
237            {
238                // Indexes are registered dynamically by VRL lookups against the global
239                // enrichment table registry. Preserve any existing registry indexes even
240                // when this config diff sees the table as newly added.
241                let indexes = ENRICHMENT_TABLES.index_fields(&table_name);
242
243                let mut prev_state = None;
244                if !self.diff.enrichment_tables.tables.is_added(name)
245                    && table_outer.inner.wants_previous_state()
246                {
247                    prev_state = ENRICHMENT_TABLES.extract_state(&table_name);
248                }
249
250                let mut table = match table_outer
251                    .inner
252                    .build(&self.config.global, prev_state)
253                    .await
254                {
255                    Ok(table) => table,
256                    Err(error) => {
257                        self.errors
258                            .push(format!("Enrichment Table \"{name}\": {error}"));
259                        continue;
260                    }
261                };
262
263                for (case, index) in indexes {
264                    match table
265                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
266                    {
267                        Ok(_) => (),
268                        Err(error) => {
269                            // If there is an error adding an index we do not want to use the reloaded
270                            // data, the previously loaded data will still need to be used.
271                            // Just report the error and continue.
272                            error!(message = "Unable to add index to reloaded enrichment table.",
273                                table = ?name.to_string(),
274                                %error);
275                            continue 'tables;
276                        }
277                    }
278                }
279
280                enrichment_tables.insert(table_name, table);
281            }
282        }
283
284        ENRICHMENT_TABLES.load(enrichment_tables);
285
286        &ENRICHMENT_TABLES
287    }
288
289    async fn build_sources(
290        &mut self,
291        enrichment_tables: &vector_lib::enrichment::TableRegistry,
292    ) -> HashMap<ComponentKey, Task> {
293        let mut source_tasks = HashMap::new();
294
295        let table_sources = self
296            .config
297            .enrichment_tables
298            .iter()
299            .filter_map(|(key, table)| table.as_source(key))
300            .collect::<Vec<_>>();
301        for (key, source) in self
302            .config
303            .sources()
304            .filter(|(key, _)| self.diff.sources.contains_new(key))
305            .chain(
306                table_sources
307                    .iter()
308                    .map(|(key, source)| (key, source))
309                    .filter(|(key, _)| self.diff.enrichment_tables.sources.contains_new(key)),
310            )
311        {
312            debug!(component_id = %key, "Building new source.");
313
314            let span = error_span!(
315                "source",
316                component_kind = "source",
317                component_id = %key.id(),
318                component_type = %source.inner.get_component_name(),
319            );
320
321            if let Ok(server) = self
322                .build_instrumented_source(key, source, enrichment_tables)
323                .instrument(span)
324                .await
325            {
326                source_tasks.insert(key.clone(), server);
327            }
328        }
329
330        source_tasks
331    }
332
333    async fn build_instrumented_source(
334        &mut self,
335        key: &ComponentKey,
336        source: &SourceOuter,
337        enrichment_tables: &vector_lib::enrichment::TableRegistry,
338    ) -> Result<Task, ()> {
339        let typetag = source.inner.get_component_name();
340        let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
341
342        let task_name = format!(
343            ">> {} ({}, pump) >>",
344            source.inner.get_component_name(),
345            key.id()
346        );
347
348        let mut builder = SourceSender::builder()
349            .with_buffer(source_sender_buffer_size())
350            .with_timeout(source.inner.send_timeout())
351            .with_ewma_half_life_seconds(
352                self.config.global.buffer_utilization_ewma_half_life_seconds,
353            );
354        let mut pumps = Vec::new();
355        let mut controls = HashMap::new();
356        let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
357
358        for output in source_outputs.into_iter() {
359            let rx = builder.add_source_output(output.clone(), key.clone());
360
361            let (fanout, control) = Fanout::new(key.clone());
362            let source_type = source.inner.get_component_name();
363            let source = Arc::new(key.clone());
364
365            let pump = run_source_output_pump(rx, fanout, source, source_type);
366
367            pumps.push(pump.instrument(Span::current()));
368            controls.insert(
369                OutputId {
370                    component: key.clone(),
371                    port: output.port.clone(),
372                },
373                control,
374            );
375
376            let port = output.port.clone();
377            if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
378                schema_definitions.insert(port, definition);
379            }
380        }
381
382        let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
383        let pump = async move {
384            debug!("Source pump supervisor starting.");
385
386            // Spawn all of the per-output pumps and then await their completion.
387            //
388            // If any of the pumps complete with an error, or panic/are cancelled, we return
389            // immediately.
390            let mut handles = FuturesUnordered::new();
391            for pump in pumps {
392                handles.push(spawn_named(pump, task_name.as_ref()));
393            }
394
395            let mut had_pump_error = false;
396            while let Some(output) = handles.try_next().await? {
397                if let Err(e) = output {
398                    // Immediately send the error to the source's wrapper future, but ignore any
399                    // errors during the send, since nested errors wouldn't make any sense here.
400                    _ = pump_error_tx.send(e);
401                    had_pump_error = true;
402                    break;
403                }
404            }
405
406            if had_pump_error {
407                debug!("Source pump supervisor task finished with an error.");
408            } else {
409                debug!("Source pump supervisor task finished normally.");
410            }
411            Ok(TaskOutput::Source)
412        };
413        let pump = Task::new(key.clone(), typetag, pump);
414
415        let (shutdown_signal, force_shutdown_tripwire) = self
416            .shutdown_coordinator
417            .register_source(key, INTERNAL_SOURCES.contains(&typetag));
418
419        let context = SourceContext {
420            key: key.clone(),
421            globals: self.config.global.clone(),
422            enrichment_tables: enrichment_tables.clone(),
423            metrics_storage: METRICS_STORAGE.clone(),
424            shutdown: shutdown_signal,
425            out: builder.build(),
426            proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
427            acknowledgements: source.sink_acknowledgements,
428            schema_definitions,
429            schema: self.config.schema,
430            extra_context: self.extra_context.clone(),
431        };
432        let server = match source.inner.build(context).await {
433            Err(error) => {
434                self.errors.push(format!("Source \"{key}\": {error}"));
435                return Err(());
436            }
437            Ok(server) => server,
438        };
439
440        // Build a wrapper future that drives the actual source future, but returns early if we've
441        // been signalled to forcefully shutdown, or if the source pump encounters an error.
442        //
443        // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully
444        // within the allotted time window. This can occur normally for certain sources, like stdin,
445        // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time
446        // to shutdown unless some input is given.
447        let server = async move {
448            debug!("Source starting.");
449
450            let mut result = select! {
451                biased;
452
453                // We've been told that we must forcefully shut down.
454                _ = force_shutdown_tripwire => Ok(()),
455
456                // The source pump encountered an error, which we're now bubbling up here to stop
457                // the source as well, since the source running makes no sense without the pump.
458                //
459                // We only match receiving a message, not the error of the sender being dropped,
460                // just to keep things simpler.
461                Ok(e) = &mut pump_error_rx => Err(e),
462
463                // The source finished normally.
464                result = server => result.map_err(|_| TaskError::Opaque),
465            };
466
467            // Even though we already tried to receive any pump task error above, we may have exited
468            // on the source itself returning an error due to task scheduling, where the pump task
469            // encountered an error, sent it over the oneshot, but we were polling the source
470            // already and hit an error trying to send to the now-shutdown pump task.
471            //
472            // Since the error from the source is opaque at the moment (i.e. `()`), we try a final
473            // time to see if the pump task encountered an error, using _that_ instead if so, to
474            // propagate the true error that caused the source to have to stop.
475            if let Ok(e) = pump_error_rx.try_recv() {
476                result = Err(e);
477            }
478
479            match result {
480                Ok(()) => {
481                    debug!("Source finished normally.");
482                    Ok(TaskOutput::Source)
483                }
484                Err(e) => {
485                    debug!("Source finished with an error.");
486                    Err(e)
487                }
488            }
489        };
490        let server = Task::new(key.clone(), typetag, server);
491
492        self.outputs.extend(controls);
493        self.tasks.insert(key.clone(), pump);
494
495        Ok(server)
496    }
497
498    async fn build_transforms(
499        &mut self,
500        enrichment_tables: &vector_lib::enrichment::TableRegistry,
501    ) {
502        let mut definition_cache = HashMap::default();
503
504        for (key, transform) in self
505            .config
506            .transforms()
507            .filter(|(key, _)| self.diff.transforms.contains_new(key))
508        {
509            debug!(component_id = %key, "Building new transform.");
510
511            let input_definitions = match schema::input_definitions(
512                &transform.inputs,
513                self.config,
514                enrichment_tables.clone(),
515                &mut definition_cache,
516            ) {
517                Ok(definitions) => definitions,
518                Err(_) => {
519                    // We have received an error whilst retrieving the definitions,
520                    // there is no point in continuing.
521
522                    return;
523                }
524            };
525
526            let span = error_span!(
527                "transform",
528                component_kind = "transform",
529                component_id = %key.id(),
530                component_type = %transform.inner.get_component_name(),
531            );
532
533            self.build_instrumented_transform(key, transform, enrichment_tables, input_definitions)
534                .instrument(span)
535                .await;
536        }
537    }
538
539    async fn build_instrumented_transform(
540        &mut self,
541        key: &ComponentKey,
542        transform: &TransformOuter<OutputId>,
543        enrichment_tables: &vector_lib::enrichment::TableRegistry,
544        input_definitions: Vec<(OutputId, Definition)>,
545    ) {
546        let merged_definition: Definition = input_definitions
547            .iter()
548            .map(|(_output_id, definition)| definition.clone())
549            .reduce(Definition::merge)
550            // We may not have any definitions if all the inputs are from metrics sources.
551            .unwrap_or_else(Definition::any);
552
553        // Create a map of the outputs to the list of possible definitions from those outputs.
554        let schema_definitions = transform
555            .inner
556            .outputs(
557                &TransformContext {
558                    enrichment_tables: enrichment_tables.clone(),
559                    metrics_storage: METRICS_STORAGE.clone(),
560                    schema: self.config.schema,
561                    ..Default::default()
562                },
563                &input_definitions,
564            )
565            .into_iter()
566            .map(|output| {
567                let definitions = output.schema_definitions(self.config.schema.enabled);
568                (output.port, definitions)
569            })
570            .collect::<HashMap<_, _>>();
571
572        let context = TransformContext {
573            key: Some(key.clone()),
574            globals: self.config.global.clone(),
575            enrichment_tables: enrichment_tables.clone(),
576            metrics_storage: METRICS_STORAGE.clone(),
577            schema_definitions,
578            merged_schema_definition: merged_definition.clone(),
579            schema: self.config.schema,
580            extra_context: self.extra_context.clone(),
581            // Resolve the per-component CPU counter inside the transform span so it
582            // picks up component_id/component_kind/component_type tags. The same
583            // handle is shared between the main transform task and any helper
584            // tokio tasks the transform spawns at construction time. On platforms
585            // without per-thread CPU time, `register_counter` returns a noop
586            // handle and the metric is silently omitted.
587            //
588            // `None` when `measure_cpu_usage` is false (the default): no counter
589            // is registered and no per-poll `ThreadTime` measurement takes place.
590            cpu_ns: if transform.measure_cpu_usage {
591                Some(crate::cpu_time::register_counter())
592            } else {
593                None
594            },
595        };
596
597        let node = TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
598
599        let transform = match transform
600            .inner
601            .build(&context)
602            .instrument(Span::current())
603            .await
604        {
605            Err(error) => {
606                self.errors.push(format!("Transform \"{key}\": {error}"));
607                return;
608            }
609            Ok(transform) => transform,
610        };
611
612        let metrics = ChannelMetricMetadata::new(BufferChannelKind::Transform, None);
613        let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
614            TOPOLOGY_BUFFER_SIZE,
615            WhenFull::Block,
616            &Span::current(),
617            Some(metrics),
618            self.config.global.buffer_utilization_ewma_half_life_seconds,
619        );
620
621        self.inputs
622            .insert(key.clone(), (input_tx, node.inputs.clone()));
623
624        let (transform_task, transform_outputs) = self.build_transform(transform, node, input_rx);
625
626        self.outputs.extend(transform_outputs);
627        self.tasks.insert(key.clone(), transform_task);
628    }
629
630    async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
631        let table_sinks = self
632            .config
633            .enrichment_tables
634            .iter()
635            .filter_map(|(key, table)| table.as_sink(key))
636            .collect::<Vec<_>>();
637        for (key, sink) in self
638            .config
639            .sinks()
640            .filter(|(key, _)| self.diff.sinks.contains_new(key))
641            .chain(
642                table_sinks
643                    .iter()
644                    .map(|(key, sink)| (key, sink))
645                    .filter(|(key, _)| self.diff.enrichment_tables.sinks.contains_new(key)),
646            )
647        {
648            debug!(component_id = %key, "Building new sink.");
649
650            let span = error_span!(
651                "sink",
652                component_kind = "sink",
653                component_id = %key.id(),
654                component_type = %sink.inner.get_component_name(),
655            );
656
657            self.build_instrumented_sink(key, sink, enrichment_tables)
658                .instrument(span)
659                .await;
660        }
661    }
662
663    async fn build_instrumented_sink(
664        &mut self,
665        key: &ComponentKey,
666        sink: &SinkOuter<OutputId>,
667        enrichment_tables: &vector_lib::enrichment::TableRegistry,
668    ) {
669        let sink_inputs = &sink.inputs;
670        let healthcheck = sink.healthcheck();
671        let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
672        let healthcheck_timeout = healthcheck.timeout;
673
674        let typetag = sink.inner.get_component_name();
675        let input_type = sink.inner.input().data_type();
676
677        // At this point, we've validated that all transforms are valid, including any
678        // transform that mutates the schema provided by their sources. We can now validate the
679        // schema expectations of each individual sink.
680        if let Err(mut err) =
681            schema::validate_sink_expectations(key, sink, self.config, enrichment_tables.clone())
682        {
683            self.errors.append(&mut err);
684        };
685
686        let (tx, rx) = match self.buffers.remove(key) {
687            Some(buffer) => buffer,
688            _ => {
689                let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") {
690                    BufferType::Memory { .. } => "memory",
691                    BufferType::DiskV2 { .. } => "disk",
692                };
693                let buffer_span = error_span!("sink", buffer_type);
694                let buffer = sink
695                    .buffer
696                    .build(
697                        self.config.global.data_dir.clone(),
698                        key.to_string(),
699                        buffer_span,
700                    )
701                    .await;
702                match buffer {
703                    Err(error) => {
704                        self.errors.push(format!("Sink \"{key}\": {error}"));
705                        return;
706                    }
707                    Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
708                }
709            }
710        };
711
712        let cx = SinkContext {
713            healthcheck,
714            globals: self.config.global.clone(),
715            enrichment_tables: enrichment_tables.clone(),
716            metrics_storage: METRICS_STORAGE.clone(),
717            proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
718            schema: self.config.schema,
719            app_name: crate::get_app_name().to_string(),
720            app_name_slug: crate::get_slugified_app_name(),
721            extra_context: self.extra_context.clone(),
722        };
723
724        let (sink, healthcheck) = match sink.inner.build(cx).await {
725            Err(error) => {
726                self.errors.push(format!("Sink \"{key}\": {error}"));
727                return;
728            }
729            Ok(built) => built,
730        };
731
732        let (trigger, tripwire) = Tripwire::new();
733
734        let utilization_sender = self
735            .utilization_registry
736            .add_component(key.clone(), gauge!(GaugeName::Utilization));
737        let component_key = key.clone();
738        let sink = async move {
739            debug!("Sink starting.");
740
741            // Why is this Arc<Mutex<Option<_>>> needed you ask.
742            // In case when this function build_pieces errors
743            // this future won't be run so this rx won't be taken
744            // which will enable us to reuse rx to rebuild
745            // old configuration by passing this Arc<Mutex<Option<_>>>
746            // yet again.
747            let rx = rx
748                .lock()
749                .unwrap()
750                .take()
751                .expect("Task started but input has been taken.");
752
753            let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
754
755            let events_received = register!(EventsReceived);
756            sink.run(
757                rx.by_ref()
758                    .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
759                    .inspect(|events| {
760                        events_received.emit(CountByteSize(
761                            events.len(),
762                            events.estimated_json_encoded_size_of(),
763                        ))
764                    })
765                    .take_until_if(tripwire),
766            )
767            .await
768            .map(|_| {
769                debug!("Sink finished normally.");
770                TaskOutput::Sink(rx)
771            })
772            .map_err(|_| {
773                debug!("Sink finished with an error.");
774                TaskError::Opaque
775            })
776        };
777
778        let task = Task::new(key.clone(), typetag, sink);
779
780        let component_key = key.clone();
781        let healthcheck_task = async move {
782            if enable_healthcheck {
783                timeout(healthcheck_timeout, healthcheck)
784                    .map(|result| match result {
785                        Ok(Ok(_)) => {
786                            info!("Healthcheck passed.");
787                            Ok(TaskOutput::Healthcheck)
788                        }
789                        Ok(Err(error)) => {
790                            error!(
791                                msg = "Healthcheck failed.",
792                                %error,
793                                component_kind = "sink",
794                                component_type = typetag,
795                                component_id = %component_key.id(),
796                            );
797                            Err(TaskError::wrapped(error))
798                        }
799                        Err(e) => {
800                            error!(
801                                msg = "Healthcheck timed out.",
802                                component_kind = "sink",
803                                component_type = typetag,
804                                component_id = %component_key.id(),
805                            );
806                            Err(TaskError::wrapped(Box::new(e)))
807                        }
808                    })
809                    .await
810            } else {
811                info!("Healthcheck disabled.");
812                Ok(TaskOutput::Healthcheck)
813            }
814        };
815
816        let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
817
818        self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
819        self.healthchecks.insert(key.clone(), healthcheck_task);
820        self.tasks.insert(key.clone(), task);
821        self.detach_triggers.insert(key.clone(), trigger);
822    }
823
824    fn build_transform(
825        &self,
826        transform: Transform,
827        node: TransformNode,
828        input_rx: BufferReceiver<EventArray>,
829    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
830        match transform {
831            // TODO: avoid the double boxing for function transforms here
832            Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
833            Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
834            Transform::Task(t) => self.build_task_transform(t, node, input_rx),
835        }
836    }
837
838    fn build_sync_transform(
839        &self,
840        t: Box<dyn SyncTransform>,
841        node: TransformNode,
842        input_rx: BufferReceiver<EventArray>,
843    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
844        let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
845
846        let sender = self
847            .utilization_registry
848            .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
849        let runner = Runner::new(
850            t,
851            input_rx,
852            sender,
853            node.input_details.data_type(),
854            outputs,
855            LatencyRecorder::new(self.config.global.latency_ewma_alpha),
856            node.cpu_ns.clone(),
857        );
858
859        // Attribute the runner task's per-poll CPU time (driver loop +,
860        // for the inline variant, the transform body) to the component. The
861        // concurrent variant additionally spawns transform invocations onto
862        // their own tasks with `spawn_timed`, which feed the same counter.
863        // When `cpu_ns` is `None` the futures are returned as-is — no
864        // `ThreadTime` sampling takes place.
865        let transform = if node.enable_concurrency {
866            let fut = runner.run_concurrently();
867
868            if let Some(cpu_ns) = node.cpu_ns {
869                fut.cpu_timed(cpu_ns).boxed()
870            } else {
871                fut.boxed()
872            }
873        } else {
874            let fut = runner.run_inline();
875
876            if let Some(cpu_ns) = node.cpu_ns {
877                fut.cpu_timed(cpu_ns).boxed()
878            } else {
879                fut.boxed()
880            }
881        };
882
883        let transform = async move {
884            debug!("Synchronous transform starting.");
885
886            match transform.await {
887                Ok(v) => {
888                    debug!("Synchronous transform finished normally.");
889                    Ok(v)
890                }
891                Err(e) => {
892                    debug!("Synchronous transform finished with an error.");
893                    Err(e)
894                }
895            }
896        };
897
898        let mut output_controls = HashMap::new();
899        for (name, control) in controls {
900            let id = name
901                .map(|name| OutputId::from((&node.key, name)))
902                .unwrap_or_else(|| OutputId::from(&node.key));
903            output_controls.insert(id, control);
904        }
905
906        let task = Task::new(node.key.clone(), node.typetag, transform);
907
908        (task, output_controls)
909    }
910
911    fn build_task_transform(
912        &self,
913        t: Box<dyn TaskTransform<EventArray>>,
914        node: TransformNode,
915        input_rx: BufferReceiver<EventArray>,
916    ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
917        let TransformNode {
918            key,
919            typetag,
920            input_details,
921            outputs,
922            cpu_ns,
923            ..
924        } = node;
925        let input_type = input_details.data_type();
926
927        let (mut fanout, control) = Fanout::new(key.clone());
928
929        let sender = self
930            .utilization_registry
931            .add_component(key.clone(), gauge!(GaugeName::Utilization));
932        let output_sender = sender.clone();
933        let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
934
935        let events_received = register!(EventsReceived);
936        let filtered = input_rx
937            .filter(move |events| ready(filter_events_type(events, input_type)))
938            .inspect(move |events| {
939                events_received.emit(CountByteSize(
940                    events.len(),
941                    events.estimated_json_encoded_size_of(),
942                ))
943            });
944        let events_sent = register!(EventsSent::from(internal_event::Output(None)));
945        let output_id = Arc::new(OutputId {
946            component: key.clone(),
947            port: None,
948        });
949        let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
950
951        // Task transforms can only write to the default output, so only a single schema def map is needed
952        let schema_definition_map = outputs
953            .iter()
954            .find(|x| x.port.is_none())
955            .expect("output for default port required for task transforms")
956            .log_schema_definitions
957            .clone()
958            .into_iter()
959            .map(|(key, value)| (key, Arc::new(value)))
960            .collect();
961
962        let stream = t
963            .transform(Box::pin(filtered))
964            .map(move |mut events| {
965                for event in events.iter_events_mut() {
966                    update_runtime_schema_definition(event, &output_id, &schema_definition_map);
967                }
968                let now = Instant::now();
969                latency_recorder.on_send(&mut events, now);
970                (events, now)
971            })
972            .inspect(move |(events, _): &(EventArray, Instant)| {
973                events_sent.emit(CountByteSize(
974                    events.len(),
975                    events.estimated_json_encoded_size_of(),
976                ));
977            });
978        let stream = OutputUtilization::new(output_sender, stream);
979        let transform = async move {
980            debug!("Task transform starting.");
981
982            match fanout.send_stream(stream).await {
983                Ok(()) => {
984                    debug!("Task transform finished normally.");
985                    Ok(TaskOutput::Transform)
986                }
987                Err(e) => {
988                    debug!("Task transform finished with an error.");
989                    Err(TaskError::wrapped(e))
990                }
991            }
992        };
993
994        let transform = if let Some(cpu_ns) = cpu_ns {
995            transform.cpu_timed(cpu_ns).boxed()
996        } else {
997            transform.boxed()
998        };
999
1000        let mut outputs = HashMap::new();
1001        outputs.insert(OutputId::from(&key), control);
1002
1003        let task = Task::new(key, typetag, transform);
1004
1005        (task, outputs)
1006    }
1007}
1008
1009async fn run_source_output_pump(
1010    mut rx: LimitedReceiver<SourceSenderItem>,
1011    mut fanout: Fanout,
1012    source: Arc<ComponentKey>,
1013    source_type: &'static str,
1014) -> TaskResult {
1015    debug!("Source pump starting.");
1016
1017    let mut control_channel_open = true;
1018    loop {
1019        tokio::select! {
1020            biased;
1021            // Process control messages (e.g. Remove/Pause) even when the source
1022            // is idle, so that config reloads can proceed without waiting for the
1023            // next event.
1024            alive = fanout.recv_control_message(), if control_channel_open => {
1025                control_channel_open = alive;
1026            }
1027            item = rx.next() => {
1028                match item {
1029                    Some(SourceSenderItem { events: mut array, send_reference }) => {
1030                        // Even though we have a `send_reference` timestamp above, that reference
1031                        // time is when the events were enqueued in the `SourceSender`, not when
1032                        // they were pulled out of the `rx` stream on this end. Since those times
1033                        // can be quite different (due to blocking inherent to the fanout send
1034                        // operation), we set the `last_transform_timestamp` to the current time
1035                        // instead to get an accurate reference for when the events started
1036                        // waiting for the first transform.
1037                        let now = Instant::now();
1038                        array.for_each_metadata_mut(|metadata| {
1039                            metadata.set_source_id(Arc::clone(&source));
1040                            metadata.set_source_type(source_type);
1041                            metadata.set_last_transform_timestamp(now);
1042                        });
1043                        fanout
1044                            .send(array, Some(send_reference))
1045                            .await
1046                            .map_err(|e| {
1047                                debug!("Source pump finished with an error.");
1048                                TaskError::wrapped(e)
1049                            })?;
1050                    }
1051                    None => break,
1052                }
1053            }
1054        }
1055    }
1056
1057    debug!("Source pump finished normally.");
1058    Ok(TaskOutput::Source)
1059}
1060
1061/// Reloads file based enrichment tables - not stateful ones
1062pub async fn reload_enrichment_tables(config: &Config) {
1063    let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await;
1064    let mut enrichment_tables = HashMap::new();
1065    // Build enrichment tables
1066    'tables: for (name, table_outer) in config.enrichment_tables.iter() {
1067        let table_name = name.to_string();
1068        if ENRICHMENT_TABLES.needs_reload(&table_name)
1069            // Tables that can act as sinks are reloaded through topology
1070            && table_outer.as_sink(name).is_none()
1071        {
1072            let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
1073
1074            let mut table = match table_outer.inner.build(&config.global, None).await {
1075                Ok(table) => table,
1076                Err(error) => {
1077                    error!("Enrichment table \"{name}\" reload failed: {error}");
1078                    continue;
1079                }
1080            };
1081
1082            if let Some(indexes) = indexes {
1083                for (case, index) in indexes {
1084                    match table
1085                        .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
1086                    {
1087                        Ok(_) => (),
1088                        Err(error) => {
1089                            // If there is an error adding an index we do not want to use the reloaded
1090                            // data, the previously loaded data will still need to be used.
1091                            // Just report the error and continue.
1092                            error!(
1093                                message = "Unable to add index to reloaded enrichment table.",
1094                                table = ?name.to_string(),
1095                                %error
1096                            );
1097                            continue 'tables;
1098                        }
1099                    }
1100                }
1101            }
1102
1103            enrichment_tables.insert(table_name, table);
1104        }
1105    }
1106
1107    ENRICHMENT_TABLES.load(enrichment_tables);
1108    ENRICHMENT_TABLES.finish_load();
1109}
1110
1111pub struct TopologyPieces {
1112    pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
1113    pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
1114    pub(super) tasks: HashMap<ComponentKey, Task>,
1115    pub(crate) source_tasks: HashMap<ComponentKey, Task>,
1116    pub(super) healthchecks: HashMap<ComponentKey, Task>,
1117    pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
1118    pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
1119    pub(crate) metrics_storage: MetricsStorage,
1120    pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
1121}
1122
1123/// Builder for constructing TopologyPieces with a fluent API.
1124///
1125/// # Examples
1126///
1127/// ```ignore
1128/// let pieces = TopologyPiecesBuilder::new(&config, &diff)
1129///     .with_buffers(buffers)
1130///     .with_extra_context(extra_context)
1131///     .build()
1132///     .await?;
1133/// ```
1134pub struct TopologyPiecesBuilder<'a> {
1135    config: &'a Config,
1136    diff: &'a ConfigDiff,
1137    buffers: HashMap<ComponentKey, BuiltBuffer>,
1138    extra_context: ExtraContext,
1139    utilization_registry: Option<UtilizationRegistry>,
1140}
1141
1142impl<'a> TopologyPiecesBuilder<'a> {
1143    /// Creates a new builder with required parameters.
1144    pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1145        Self {
1146            config,
1147            diff,
1148            buffers: HashMap::new(),
1149            extra_context: ExtraContext::default(),
1150            utilization_registry: None,
1151        }
1152    }
1153
1154    /// Sets the buffers for the topology.
1155    pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1156        self.buffers = buffers;
1157        self
1158    }
1159
1160    /// Sets the extra context for the topology.
1161    pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1162        self.extra_context = extra_context;
1163        self
1164    }
1165
1166    /// Sets the utilization registry for the topology.
1167    pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1168        self.utilization_registry = registry;
1169        self
1170    }
1171
1172    /// Builds the topology pieces, returning errors if any occur.
1173    ///
1174    /// Use this method when you need to handle errors explicitly,
1175    /// such as in tests or validation code.
1176    pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1177        Builder::new(
1178            self.config,
1179            self.diff,
1180            self.buffers,
1181            self.extra_context,
1182            self.utilization_registry,
1183        )
1184        .build()
1185        .await
1186    }
1187
1188    /// Builds the topology pieces, logging any errors that occur.
1189    ///
1190    /// Use this method for runtime configuration loading where
1191    /// errors should be logged and execution should continue.
1192    pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1193        match self.build().await {
1194            Err(errors) => {
1195                for error in errors {
1196                    error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1197                }
1198                None
1199            }
1200            Ok(new_pieces) => Some(new_pieces),
1201        }
1202    }
1203}
1204
1205impl TopologyPieces {
1206    pub async fn build_or_log_errors(
1207        config: &Config,
1208        diff: &ConfigDiff,
1209        buffers: HashMap<ComponentKey, BuiltBuffer>,
1210        extra_context: ExtraContext,
1211        utilization_registry: Option<UtilizationRegistry>,
1212    ) -> Option<Self> {
1213        TopologyPiecesBuilder::new(config, diff)
1214            .with_buffers(buffers)
1215            .with_extra_context(extra_context)
1216            .with_utilization_registry(utilization_registry)
1217            .build_or_log_errors()
1218            .await
1219    }
1220
1221    /// Builds only the new pieces, and doesn't check their topology.
1222    pub async fn build(
1223        config: &super::Config,
1224        diff: &ConfigDiff,
1225        buffers: HashMap<ComponentKey, BuiltBuffer>,
1226        extra_context: ExtraContext,
1227        utilization_registry: Option<UtilizationRegistry>,
1228    ) -> Result<Self, Vec<String>> {
1229        TopologyPiecesBuilder::new(config, diff)
1230            .with_buffers(buffers)
1231            .with_extra_context(extra_context)
1232            .with_utilization_registry(utilization_registry)
1233            .build()
1234            .await
1235    }
1236}
1237
1238const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1239    match events {
1240        EventArray::Logs(_) => data_type.contains(DataType::Log),
1241        EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1242        EventArray::Traces(_) => data_type.contains(DataType::Trace),
1243    }
1244}
1245
1246#[derive(Debug, Clone)]
1247struct TransformNode {
1248    key: ComponentKey,
1249    typetag: &'static str,
1250    inputs: Inputs<OutputId>,
1251    input_details: Input,
1252    outputs: Vec<TransformOutput>,
1253    enable_concurrency: bool,
1254    cpu_ns: Option<Counter>,
1255}
1256
1257impl TransformNode {
1258    pub fn from_parts(
1259        key: ComponentKey,
1260        context: &TransformContext,
1261        transform: &TransformOuter<OutputId>,
1262        schema_definition: &[(OutputId, Definition)],
1263    ) -> Self {
1264        Self {
1265            key,
1266            typetag: transform.inner.get_component_name(),
1267            inputs: transform.inputs.clone(),
1268            input_details: transform.inner.input(),
1269            outputs: transform.inner.outputs(context, schema_definition),
1270            enable_concurrency: transform.inner.enable_concurrency(),
1271            cpu_ns: context.cpu_ns.clone(),
1272        }
1273    }
1274}
1275
1276struct Runner {
1277    transform: Box<dyn SyncTransform>,
1278    input_rx: Option<BufferReceiver<EventArray>>,
1279    input_type: DataType,
1280    outputs: TransformOutputs,
1281    timer_tx: UtilizationComponentSender,
1282    latency_recorder: LatencyRecorder,
1283    events_received: Registered<EventsReceived>,
1284    cpu_ns: Option<Counter>,
1285}
1286
1287impl Runner {
1288    fn new(
1289        transform: Box<dyn SyncTransform>,
1290        input_rx: BufferReceiver<EventArray>,
1291        timer_tx: UtilizationComponentSender,
1292        input_type: DataType,
1293        outputs: TransformOutputs,
1294        latency_recorder: LatencyRecorder,
1295        cpu_ns: Option<Counter>,
1296    ) -> Self {
1297        Self {
1298            transform,
1299            input_rx: Some(input_rx),
1300            input_type,
1301            outputs,
1302            timer_tx,
1303            latency_recorder,
1304            events_received: register!(EventsReceived),
1305            cpu_ns,
1306        }
1307    }
1308
1309    fn on_events_received(&mut self, events: &EventArray) {
1310        self.timer_tx.try_send_stop_wait();
1311
1312        self.events_received.emit(CountByteSize(
1313            events.len(),
1314            events.estimated_json_encoded_size_of(),
1315        ));
1316    }
1317
1318    async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1319        self.timer_tx.try_send_start_wait();
1320        let now = Instant::now();
1321        outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1322        self.outputs.send(outputs_buf).await
1323    }
1324
1325    async fn run_inline(mut self) -> TaskResult {
1326        // 128 is an arbitrary, smallish constant
1327        const INLINE_BATCH_SIZE: usize = 128;
1328
1329        let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1330
1331        let mut input_rx = self
1332            .input_rx
1333            .take()
1334            .expect("can't run runner twice")
1335            .into_stream()
1336            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1337
1338        self.timer_tx.try_send_start_wait();
1339        while let Some(events) = input_rx.next().await {
1340            self.on_events_received(&events);
1341            self.transform.transform_all(events, &mut outputs_buf);
1342            self.send_outputs(&mut outputs_buf)
1343                .await
1344                .map_err(TaskError::wrapped)?;
1345        }
1346
1347        Ok(TaskOutput::Transform)
1348    }
1349
1350    async fn run_concurrently(mut self) -> TaskResult {
1351        let input_rx = self
1352            .input_rx
1353            .take()
1354            .expect("can't run runner twice")
1355            .into_stream()
1356            .filter(move |events| ready(filter_events_type(events, self.input_type)));
1357
1358        let mut input_rx =
1359            super::ready_arrays::ReadyArrays::with_capacity(input_rx, ready_array_capacity());
1360
1361        let mut in_flight = FuturesOrdered::new();
1362        let mut shutting_down = false;
1363
1364        self.timer_tx.try_send_start_wait();
1365        loop {
1366            tokio::select! {
1367                biased;
1368
1369                result = in_flight.next(), if !in_flight.is_empty() => {
1370                    match result {
1371                        Some(Ok(mut outputs_buf)) => {
1372                            self.send_outputs(&mut outputs_buf).await
1373                                .map_err(TaskError::wrapped)?;
1374                        }
1375                        _ => unreachable!("join error or bad poll"),
1376                    }
1377                }
1378
1379                input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1380                    match input_arrays {
1381                        Some(input_arrays) => {
1382                            let mut len = 0;
1383                            for events in &input_arrays {
1384                                self.on_events_received(events);
1385                                len += events.len();
1386                            }
1387
1388                            let mut t = self.transform.clone();
1389                            let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1390                            // Hook CPU-time accounting onto the spawned task at
1391                            // the `Future::poll` boundary.
1392                            // This is a separate task from the current one, so there is no double-counting.
1393                            let task = spawn_timed(
1394                                async move {
1395                                    for events in input_arrays {
1396                                        t.transform_all(events, &mut outputs_buf);
1397                                    }
1398                                    outputs_buf
1399                                },
1400                                self.cpu_ns.clone(),
1401                            );
1402                            in_flight.push_back(task);
1403                        }
1404                        None => {
1405                            shutting_down = true;
1406                            continue
1407                        }
1408                    }
1409                }
1410
1411                else => {
1412                    if shutting_down {
1413                        break
1414                    }
1415                }
1416            }
1417        }
1418
1419        Ok(TaskOutput::Transform)
1420    }
1421}