1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{
6 Arc, LazyLock, Mutex,
7 atomic::{AtomicUsize, Ordering},
8 },
9 time::Instant,
10};
11
12use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
13use futures_util::stream::FuturesUnordered;
14use metrics::Counter;
15use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
16use tokio::{
17 select,
18 sync::{Mutex as AsyncMutex, mpsc::UnboundedSender, oneshot},
19 time::timeout,
20};
21use tracing::{Instrument, Span};
22use vector_lib::{
23 EstimatedJsonEncodedSizeOf,
24 buffers::{
25 BufferType, WhenFull,
26 topology::{
27 builder::TopologyBuilder,
28 channel::{
29 BufferChannelKind, BufferReceiver, BufferSender, ChannelMetricMetadata,
30 LimitedReceiver,
31 },
32 },
33 },
34 enrichment::Table,
35 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
36 latency::LatencyRecorder,
37 schema::Definition,
38 source_sender::{DEFAULT_CHUNK_SIZE_EVENTS, SourceSenderItem},
39 transform::update_runtime_schema_definition,
40};
41use vector_lib::{gauge, internal_event::GaugeName};
42use vector_vrl_metrics::MetricsStorage;
43
44use super::{
45 BuiltBuffer, ConfigDiff,
46 fanout::{self, Fanout},
47 schema,
48 task::{Task, TaskOutput, TaskResult},
49};
50use crate::{
51 SourceSender,
52 config::{
53 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
54 ProxyConfig, SinkContext, SinkOuter, SourceContext, SourceOuter, TransformContext,
55 TransformOuter, TransformOutput,
56 },
57 cpu_time::{CpuTimedExt, spawn_timed},
58 event::{EventArray, EventContainer},
59 extra_context::ExtraContext,
60 internal_events::EventsReceived,
61 shutdown::SourceShutdownCoordinator,
62 spawn_named,
63 topology::task::TaskError,
64 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
65 utilization::{
66 OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
67 UtilizationRegistry,
68 },
69};
70
71static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
72 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
73static ENRICHMENT_TABLES_LOAD_LOCK: LazyLock<AsyncMutex<()>> = LazyLock::new(AsyncMutex::default);
76static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
77
78static SOURCE_SENDER_BUFFER_SIZE: AtomicUsize = AtomicUsize::new(0);
82static READY_ARRAY_CAPACITY: AtomicUsize = AtomicUsize::new(0);
83
84pub(crate) const READY_ARRAY_CAPACITY_CHUNKS: usize = 4;
87
88pub(crate) fn source_sender_buffer_size() -> usize {
90 match SOURCE_SENDER_BUFFER_SIZE.load(Ordering::Relaxed) {
91 0 => *TRANSFORM_CONCURRENCY_LIMIT * DEFAULT_CHUNK_SIZE_EVENTS,
92 size => size,
93 }
94}
95
96pub(crate) fn set_source_sender_buffer_size(size: usize) {
99 SOURCE_SENDER_BUFFER_SIZE
100 .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
101 .unwrap_or_else(|_| panic!("double source_sender_buffer_size initialization"));
102}
103
104pub(crate) fn ready_array_capacity() -> NonZeroUsize {
106 let size = match READY_ARRAY_CAPACITY.load(Ordering::Relaxed) {
107 0 => DEFAULT_CHUNK_SIZE_EVENTS * READY_ARRAY_CAPACITY_CHUNKS,
108 size => size,
109 };
110 NonZeroUsize::new(size).expect("ready array capacity is non-zero")
111}
112
113pub(crate) fn set_ready_array_capacity(size: usize) {
116 READY_ARRAY_CAPACITY
117 .compare_exchange(0, size, Ordering::Acquire, Ordering::Relaxed)
118 .unwrap_or_else(|_| panic!("double ready_array_capacity initialization"));
119}
120
121pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
122
123static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
124 crate::app::worker_threads()
125 .map(std::num::NonZeroUsize::get)
126 .unwrap_or_else(crate::num_threads)
127});
128
129const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
130
131struct Builder<'a> {
132 config: &'a super::Config,
133 diff: &'a ConfigDiff,
134 shutdown_coordinator: SourceShutdownCoordinator,
135 errors: Vec<String>,
136 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
137 tasks: HashMap<ComponentKey, Task>,
138 buffers: HashMap<ComponentKey, BuiltBuffer>,
139 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
140 healthchecks: HashMap<ComponentKey, Task>,
141 detach_triggers: HashMap<ComponentKey, Trigger>,
142 extra_context: ExtraContext,
143 utilization_emitter: Option<UtilizationEmitter>,
144 utilization_registry: UtilizationRegistry,
145}
146
147impl<'a> Builder<'a> {
148 fn new(
149 config: &'a super::Config,
150 diff: &'a ConfigDiff,
151 buffers: HashMap<ComponentKey, BuiltBuffer>,
152 extra_context: ExtraContext,
153 utilization_registry: Option<UtilizationRegistry>,
154 ) -> Self {
155 let (emitter, registry) = if let Some(registry) = utilization_registry {
158 (None, registry)
159 } else {
160 let (emitter, registry) = UtilizationEmitter::new();
161 (Some(emitter), registry)
162 };
163 Self {
164 config,
165 diff,
166 buffers,
167 shutdown_coordinator: SourceShutdownCoordinator::default(),
168 errors: vec![],
169 outputs: HashMap::new(),
170 tasks: HashMap::new(),
171 inputs: HashMap::new(),
172 healthchecks: HashMap::new(),
173 detach_triggers: HashMap::new(),
174 extra_context,
175 utilization_emitter: emitter,
176 utilization_registry: registry,
177 }
178 }
179
180 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
182 let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await;
183 let enrichment_tables = self.load_enrichment_tables().await;
184 let source_tasks = self.build_sources(enrichment_tables).await;
185 self.build_transforms(enrichment_tables).await;
186 self.build_sinks(enrichment_tables).await;
187
188 if self.errors.is_empty() {
189 enrichment_tables.finish_load();
192
193 Ok(TopologyPieces {
194 inputs: self.inputs,
195 outputs: Self::finalize_outputs(self.outputs),
196 tasks: self.tasks,
197 source_tasks,
198 healthchecks: self.healthchecks,
199 shutdown_coordinator: self.shutdown_coordinator,
200 detach_triggers: self.detach_triggers,
201 metrics_storage: METRICS_STORAGE.clone(),
202 utilization: self
203 .utilization_emitter
204 .map(|e| (e, self.utilization_registry)),
205 })
206 } else {
207 Err(self.errors)
208 }
209 }
210
211 fn finalize_outputs(
212 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
213 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
214 {
215 let mut finalized_outputs = HashMap::new();
216 for (id, output) in outputs {
217 let entry = finalized_outputs
218 .entry(id.component)
219 .or_insert_with(HashMap::new);
220 entry.insert(id.port, output);
221 }
222
223 finalized_outputs
224 }
225
226 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
229 let mut enrichment_tables: HashMap<String, Box<dyn Table + Send + Sync>> = HashMap::new();
230
231 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
233 let table_name = name.to_string();
234 if ENRICHMENT_TABLES.needs_reload(&table_name)
235 || self.diff.enrichment_tables.tables.is_changed(name)
236 || self.diff.enrichment_tables.tables.is_added(name)
237 {
238 let indexes = ENRICHMENT_TABLES.index_fields(&table_name);
242
243 let mut prev_state = None;
244 if !self.diff.enrichment_tables.tables.is_added(name)
245 && table_outer.inner.wants_previous_state()
246 {
247 prev_state = ENRICHMENT_TABLES.extract_state(&table_name);
248 }
249
250 let mut table = match table_outer
251 .inner
252 .build(&self.config.global, prev_state)
253 .await
254 {
255 Ok(table) => table,
256 Err(error) => {
257 self.errors
258 .push(format!("Enrichment Table \"{name}\": {error}"));
259 continue;
260 }
261 };
262
263 for (case, index) in indexes {
264 match table
265 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
266 {
267 Ok(_) => (),
268 Err(error) => {
269 error!(message = "Unable to add index to reloaded enrichment table.",
273 table = ?name.to_string(),
274 %error);
275 continue 'tables;
276 }
277 }
278 }
279
280 enrichment_tables.insert(table_name, table);
281 }
282 }
283
284 ENRICHMENT_TABLES.load(enrichment_tables);
285
286 &ENRICHMENT_TABLES
287 }
288
289 async fn build_sources(
290 &mut self,
291 enrichment_tables: &vector_lib::enrichment::TableRegistry,
292 ) -> HashMap<ComponentKey, Task> {
293 let mut source_tasks = HashMap::new();
294
295 let table_sources = self
296 .config
297 .enrichment_tables
298 .iter()
299 .filter_map(|(key, table)| table.as_source(key))
300 .collect::<Vec<_>>();
301 for (key, source) in self
302 .config
303 .sources()
304 .filter(|(key, _)| self.diff.sources.contains_new(key))
305 .chain(
306 table_sources
307 .iter()
308 .map(|(key, source)| (key, source))
309 .filter(|(key, _)| self.diff.enrichment_tables.sources.contains_new(key)),
310 )
311 {
312 debug!(component_id = %key, "Building new source.");
313
314 let span = error_span!(
315 "source",
316 component_kind = "source",
317 component_id = %key.id(),
318 component_type = %source.inner.get_component_name(),
319 );
320
321 if let Ok(server) = self
322 .build_instrumented_source(key, source, enrichment_tables)
323 .instrument(span)
324 .await
325 {
326 source_tasks.insert(key.clone(), server);
327 }
328 }
329
330 source_tasks
331 }
332
333 async fn build_instrumented_source(
334 &mut self,
335 key: &ComponentKey,
336 source: &SourceOuter,
337 enrichment_tables: &vector_lib::enrichment::TableRegistry,
338 ) -> Result<Task, ()> {
339 let typetag = source.inner.get_component_name();
340 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
341
342 let task_name = format!(
343 ">> {} ({}, pump) >>",
344 source.inner.get_component_name(),
345 key.id()
346 );
347
348 let mut builder = SourceSender::builder()
349 .with_buffer(source_sender_buffer_size())
350 .with_timeout(source.inner.send_timeout())
351 .with_ewma_half_life_seconds(
352 self.config.global.buffer_utilization_ewma_half_life_seconds,
353 );
354 let mut pumps = Vec::new();
355 let mut controls = HashMap::new();
356 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
357
358 for output in source_outputs.into_iter() {
359 let rx = builder.add_source_output(output.clone(), key.clone());
360
361 let (fanout, control) = Fanout::new(key.clone());
362 let source_type = source.inner.get_component_name();
363 let source = Arc::new(key.clone());
364
365 let pump = run_source_output_pump(rx, fanout, source, source_type);
366
367 pumps.push(pump.instrument(Span::current()));
368 controls.insert(
369 OutputId {
370 component: key.clone(),
371 port: output.port.clone(),
372 },
373 control,
374 );
375
376 let port = output.port.clone();
377 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
378 schema_definitions.insert(port, definition);
379 }
380 }
381
382 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
383 let pump = async move {
384 debug!("Source pump supervisor starting.");
385
386 let mut handles = FuturesUnordered::new();
391 for pump in pumps {
392 handles.push(spawn_named(pump, task_name.as_ref()));
393 }
394
395 let mut had_pump_error = false;
396 while let Some(output) = handles.try_next().await? {
397 if let Err(e) = output {
398 _ = pump_error_tx.send(e);
401 had_pump_error = true;
402 break;
403 }
404 }
405
406 if had_pump_error {
407 debug!("Source pump supervisor task finished with an error.");
408 } else {
409 debug!("Source pump supervisor task finished normally.");
410 }
411 Ok(TaskOutput::Source)
412 };
413 let pump = Task::new(key.clone(), typetag, pump);
414
415 let (shutdown_signal, force_shutdown_tripwire) = self
416 .shutdown_coordinator
417 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
418
419 let context = SourceContext {
420 key: key.clone(),
421 globals: self.config.global.clone(),
422 enrichment_tables: enrichment_tables.clone(),
423 metrics_storage: METRICS_STORAGE.clone(),
424 shutdown: shutdown_signal,
425 out: builder.build(),
426 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
427 acknowledgements: source.sink_acknowledgements,
428 schema_definitions,
429 schema: self.config.schema,
430 extra_context: self.extra_context.clone(),
431 };
432 let server = match source.inner.build(context).await {
433 Err(error) => {
434 self.errors.push(format!("Source \"{key}\": {error}"));
435 return Err(());
436 }
437 Ok(server) => server,
438 };
439
440 let server = async move {
448 debug!("Source starting.");
449
450 let mut result = select! {
451 biased;
452
453 _ = force_shutdown_tripwire => Ok(()),
455
456 Ok(e) = &mut pump_error_rx => Err(e),
462
463 result = server => result.map_err(|_| TaskError::Opaque),
465 };
466
467 if let Ok(e) = pump_error_rx.try_recv() {
476 result = Err(e);
477 }
478
479 match result {
480 Ok(()) => {
481 debug!("Source finished normally.");
482 Ok(TaskOutput::Source)
483 }
484 Err(e) => {
485 debug!("Source finished with an error.");
486 Err(e)
487 }
488 }
489 };
490 let server = Task::new(key.clone(), typetag, server);
491
492 self.outputs.extend(controls);
493 self.tasks.insert(key.clone(), pump);
494
495 Ok(server)
496 }
497
498 async fn build_transforms(
499 &mut self,
500 enrichment_tables: &vector_lib::enrichment::TableRegistry,
501 ) {
502 let mut definition_cache = HashMap::default();
503
504 for (key, transform) in self
505 .config
506 .transforms()
507 .filter(|(key, _)| self.diff.transforms.contains_new(key))
508 {
509 debug!(component_id = %key, "Building new transform.");
510
511 let input_definitions = match schema::input_definitions(
512 &transform.inputs,
513 self.config,
514 enrichment_tables.clone(),
515 &mut definition_cache,
516 ) {
517 Ok(definitions) => definitions,
518 Err(_) => {
519 return;
523 }
524 };
525
526 let span = error_span!(
527 "transform",
528 component_kind = "transform",
529 component_id = %key.id(),
530 component_type = %transform.inner.get_component_name(),
531 );
532
533 self.build_instrumented_transform(key, transform, enrichment_tables, input_definitions)
534 .instrument(span)
535 .await;
536 }
537 }
538
539 async fn build_instrumented_transform(
540 &mut self,
541 key: &ComponentKey,
542 transform: &TransformOuter<OutputId>,
543 enrichment_tables: &vector_lib::enrichment::TableRegistry,
544 input_definitions: Vec<(OutputId, Definition)>,
545 ) {
546 let merged_definition: Definition = input_definitions
547 .iter()
548 .map(|(_output_id, definition)| definition.clone())
549 .reduce(Definition::merge)
550 .unwrap_or_else(Definition::any);
552
553 let schema_definitions = transform
555 .inner
556 .outputs(
557 &TransformContext {
558 enrichment_tables: enrichment_tables.clone(),
559 metrics_storage: METRICS_STORAGE.clone(),
560 schema: self.config.schema,
561 ..Default::default()
562 },
563 &input_definitions,
564 )
565 .into_iter()
566 .map(|output| {
567 let definitions = output.schema_definitions(self.config.schema.enabled);
568 (output.port, definitions)
569 })
570 .collect::<HashMap<_, _>>();
571
572 let context = TransformContext {
573 key: Some(key.clone()),
574 globals: self.config.global.clone(),
575 enrichment_tables: enrichment_tables.clone(),
576 metrics_storage: METRICS_STORAGE.clone(),
577 schema_definitions,
578 merged_schema_definition: merged_definition.clone(),
579 schema: self.config.schema,
580 extra_context: self.extra_context.clone(),
581 cpu_ns: if transform.measure_cpu_usage {
591 Some(crate::cpu_time::register_counter())
592 } else {
593 None
594 },
595 };
596
597 let node = TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
598
599 let transform = match transform
600 .inner
601 .build(&context)
602 .instrument(Span::current())
603 .await
604 {
605 Err(error) => {
606 self.errors.push(format!("Transform \"{key}\": {error}"));
607 return;
608 }
609 Ok(transform) => transform,
610 };
611
612 let metrics = ChannelMetricMetadata::new(BufferChannelKind::Transform, None);
613 let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
614 TOPOLOGY_BUFFER_SIZE,
615 WhenFull::Block,
616 &Span::current(),
617 Some(metrics),
618 self.config.global.buffer_utilization_ewma_half_life_seconds,
619 );
620
621 self.inputs
622 .insert(key.clone(), (input_tx, node.inputs.clone()));
623
624 let (transform_task, transform_outputs) = self.build_transform(transform, node, input_rx);
625
626 self.outputs.extend(transform_outputs);
627 self.tasks.insert(key.clone(), transform_task);
628 }
629
630 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
631 let table_sinks = self
632 .config
633 .enrichment_tables
634 .iter()
635 .filter_map(|(key, table)| table.as_sink(key))
636 .collect::<Vec<_>>();
637 for (key, sink) in self
638 .config
639 .sinks()
640 .filter(|(key, _)| self.diff.sinks.contains_new(key))
641 .chain(
642 table_sinks
643 .iter()
644 .map(|(key, sink)| (key, sink))
645 .filter(|(key, _)| self.diff.enrichment_tables.sinks.contains_new(key)),
646 )
647 {
648 debug!(component_id = %key, "Building new sink.");
649
650 let span = error_span!(
651 "sink",
652 component_kind = "sink",
653 component_id = %key.id(),
654 component_type = %sink.inner.get_component_name(),
655 );
656
657 self.build_instrumented_sink(key, sink, enrichment_tables)
658 .instrument(span)
659 .await;
660 }
661 }
662
663 async fn build_instrumented_sink(
664 &mut self,
665 key: &ComponentKey,
666 sink: &SinkOuter<OutputId>,
667 enrichment_tables: &vector_lib::enrichment::TableRegistry,
668 ) {
669 let sink_inputs = &sink.inputs;
670 let healthcheck = sink.healthcheck();
671 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
672 let healthcheck_timeout = healthcheck.timeout;
673
674 let typetag = sink.inner.get_component_name();
675 let input_type = sink.inner.input().data_type();
676
677 if let Err(mut err) =
681 schema::validate_sink_expectations(key, sink, self.config, enrichment_tables.clone())
682 {
683 self.errors.append(&mut err);
684 };
685
686 let (tx, rx) = match self.buffers.remove(key) {
687 Some(buffer) => buffer,
688 _ => {
689 let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") {
690 BufferType::Memory { .. } => "memory",
691 BufferType::DiskV2 { .. } => "disk",
692 };
693 let buffer_span = error_span!("sink", buffer_type);
694 let buffer = sink
695 .buffer
696 .build(
697 self.config.global.data_dir.clone(),
698 key.to_string(),
699 buffer_span,
700 )
701 .await;
702 match buffer {
703 Err(error) => {
704 self.errors.push(format!("Sink \"{key}\": {error}"));
705 return;
706 }
707 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
708 }
709 }
710 };
711
712 let cx = SinkContext {
713 healthcheck,
714 globals: self.config.global.clone(),
715 enrichment_tables: enrichment_tables.clone(),
716 metrics_storage: METRICS_STORAGE.clone(),
717 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
718 schema: self.config.schema,
719 app_name: crate::get_app_name().to_string(),
720 app_name_slug: crate::get_slugified_app_name(),
721 extra_context: self.extra_context.clone(),
722 };
723
724 let (sink, healthcheck) = match sink.inner.build(cx).await {
725 Err(error) => {
726 self.errors.push(format!("Sink \"{key}\": {error}"));
727 return;
728 }
729 Ok(built) => built,
730 };
731
732 let (trigger, tripwire) = Tripwire::new();
733
734 let utilization_sender = self
735 .utilization_registry
736 .add_component(key.clone(), gauge!(GaugeName::Utilization));
737 let component_key = key.clone();
738 let sink = async move {
739 debug!("Sink starting.");
740
741 let rx = rx
748 .lock()
749 .unwrap()
750 .take()
751 .expect("Task started but input has been taken.");
752
753 let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
754
755 let events_received = register!(EventsReceived);
756 sink.run(
757 rx.by_ref()
758 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
759 .inspect(|events| {
760 events_received.emit(CountByteSize(
761 events.len(),
762 events.estimated_json_encoded_size_of(),
763 ))
764 })
765 .take_until_if(tripwire),
766 )
767 .await
768 .map(|_| {
769 debug!("Sink finished normally.");
770 TaskOutput::Sink(rx)
771 })
772 .map_err(|_| {
773 debug!("Sink finished with an error.");
774 TaskError::Opaque
775 })
776 };
777
778 let task = Task::new(key.clone(), typetag, sink);
779
780 let component_key = key.clone();
781 let healthcheck_task = async move {
782 if enable_healthcheck {
783 timeout(healthcheck_timeout, healthcheck)
784 .map(|result| match result {
785 Ok(Ok(_)) => {
786 info!("Healthcheck passed.");
787 Ok(TaskOutput::Healthcheck)
788 }
789 Ok(Err(error)) => {
790 error!(
791 msg = "Healthcheck failed.",
792 %error,
793 component_kind = "sink",
794 component_type = typetag,
795 component_id = %component_key.id(),
796 );
797 Err(TaskError::wrapped(error))
798 }
799 Err(e) => {
800 error!(
801 msg = "Healthcheck timed out.",
802 component_kind = "sink",
803 component_type = typetag,
804 component_id = %component_key.id(),
805 );
806 Err(TaskError::wrapped(Box::new(e)))
807 }
808 })
809 .await
810 } else {
811 info!("Healthcheck disabled.");
812 Ok(TaskOutput::Healthcheck)
813 }
814 };
815
816 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
817
818 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
819 self.healthchecks.insert(key.clone(), healthcheck_task);
820 self.tasks.insert(key.clone(), task);
821 self.detach_triggers.insert(key.clone(), trigger);
822 }
823
824 fn build_transform(
825 &self,
826 transform: Transform,
827 node: TransformNode,
828 input_rx: BufferReceiver<EventArray>,
829 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
830 match transform {
831 Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
833 Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
834 Transform::Task(t) => self.build_task_transform(t, node, input_rx),
835 }
836 }
837
838 fn build_sync_transform(
839 &self,
840 t: Box<dyn SyncTransform>,
841 node: TransformNode,
842 input_rx: BufferReceiver<EventArray>,
843 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
844 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
845
846 let sender = self
847 .utilization_registry
848 .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
849 let runner = Runner::new(
850 t,
851 input_rx,
852 sender,
853 node.input_details.data_type(),
854 outputs,
855 LatencyRecorder::new(self.config.global.latency_ewma_alpha),
856 node.cpu_ns.clone(),
857 );
858
859 let transform = if node.enable_concurrency {
866 let fut = runner.run_concurrently();
867
868 if let Some(cpu_ns) = node.cpu_ns {
869 fut.cpu_timed(cpu_ns).boxed()
870 } else {
871 fut.boxed()
872 }
873 } else {
874 let fut = runner.run_inline();
875
876 if let Some(cpu_ns) = node.cpu_ns {
877 fut.cpu_timed(cpu_ns).boxed()
878 } else {
879 fut.boxed()
880 }
881 };
882
883 let transform = async move {
884 debug!("Synchronous transform starting.");
885
886 match transform.await {
887 Ok(v) => {
888 debug!("Synchronous transform finished normally.");
889 Ok(v)
890 }
891 Err(e) => {
892 debug!("Synchronous transform finished with an error.");
893 Err(e)
894 }
895 }
896 };
897
898 let mut output_controls = HashMap::new();
899 for (name, control) in controls {
900 let id = name
901 .map(|name| OutputId::from((&node.key, name)))
902 .unwrap_or_else(|| OutputId::from(&node.key));
903 output_controls.insert(id, control);
904 }
905
906 let task = Task::new(node.key.clone(), node.typetag, transform);
907
908 (task, output_controls)
909 }
910
911 fn build_task_transform(
912 &self,
913 t: Box<dyn TaskTransform<EventArray>>,
914 node: TransformNode,
915 input_rx: BufferReceiver<EventArray>,
916 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
917 let TransformNode {
918 key,
919 typetag,
920 input_details,
921 outputs,
922 cpu_ns,
923 ..
924 } = node;
925 let input_type = input_details.data_type();
926
927 let (mut fanout, control) = Fanout::new(key.clone());
928
929 let sender = self
930 .utilization_registry
931 .add_component(key.clone(), gauge!(GaugeName::Utilization));
932 let output_sender = sender.clone();
933 let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
934
935 let events_received = register!(EventsReceived);
936 let filtered = input_rx
937 .filter(move |events| ready(filter_events_type(events, input_type)))
938 .inspect(move |events| {
939 events_received.emit(CountByteSize(
940 events.len(),
941 events.estimated_json_encoded_size_of(),
942 ))
943 });
944 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
945 let output_id = Arc::new(OutputId {
946 component: key.clone(),
947 port: None,
948 });
949 let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
950
951 let schema_definition_map = outputs
953 .iter()
954 .find(|x| x.port.is_none())
955 .expect("output for default port required for task transforms")
956 .log_schema_definitions
957 .clone()
958 .into_iter()
959 .map(|(key, value)| (key, Arc::new(value)))
960 .collect();
961
962 let stream = t
963 .transform(Box::pin(filtered))
964 .map(move |mut events| {
965 for event in events.iter_events_mut() {
966 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
967 }
968 let now = Instant::now();
969 latency_recorder.on_send(&mut events, now);
970 (events, now)
971 })
972 .inspect(move |(events, _): &(EventArray, Instant)| {
973 events_sent.emit(CountByteSize(
974 events.len(),
975 events.estimated_json_encoded_size_of(),
976 ));
977 });
978 let stream = OutputUtilization::new(output_sender, stream);
979 let transform = async move {
980 debug!("Task transform starting.");
981
982 match fanout.send_stream(stream).await {
983 Ok(()) => {
984 debug!("Task transform finished normally.");
985 Ok(TaskOutput::Transform)
986 }
987 Err(e) => {
988 debug!("Task transform finished with an error.");
989 Err(TaskError::wrapped(e))
990 }
991 }
992 };
993
994 let transform = if let Some(cpu_ns) = cpu_ns {
995 transform.cpu_timed(cpu_ns).boxed()
996 } else {
997 transform.boxed()
998 };
999
1000 let mut outputs = HashMap::new();
1001 outputs.insert(OutputId::from(&key), control);
1002
1003 let task = Task::new(key, typetag, transform);
1004
1005 (task, outputs)
1006 }
1007}
1008
1009async fn run_source_output_pump(
1010 mut rx: LimitedReceiver<SourceSenderItem>,
1011 mut fanout: Fanout,
1012 source: Arc<ComponentKey>,
1013 source_type: &'static str,
1014) -> TaskResult {
1015 debug!("Source pump starting.");
1016
1017 let mut control_channel_open = true;
1018 loop {
1019 tokio::select! {
1020 biased;
1021 alive = fanout.recv_control_message(), if control_channel_open => {
1025 control_channel_open = alive;
1026 }
1027 item = rx.next() => {
1028 match item {
1029 Some(SourceSenderItem { events: mut array, send_reference }) => {
1030 let now = Instant::now();
1038 array.for_each_metadata_mut(|metadata| {
1039 metadata.set_source_id(Arc::clone(&source));
1040 metadata.set_source_type(source_type);
1041 metadata.set_last_transform_timestamp(now);
1042 });
1043 fanout
1044 .send(array, Some(send_reference))
1045 .await
1046 .map_err(|e| {
1047 debug!("Source pump finished with an error.");
1048 TaskError::wrapped(e)
1049 })?;
1050 }
1051 None => break,
1052 }
1053 }
1054 }
1055 }
1056
1057 debug!("Source pump finished normally.");
1058 Ok(TaskOutput::Source)
1059}
1060
1061pub async fn reload_enrichment_tables(config: &Config) {
1063 let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await;
1064 let mut enrichment_tables = HashMap::new();
1065 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
1067 let table_name = name.to_string();
1068 if ENRICHMENT_TABLES.needs_reload(&table_name)
1069 && table_outer.as_sink(name).is_none()
1071 {
1072 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
1073
1074 let mut table = match table_outer.inner.build(&config.global, None).await {
1075 Ok(table) => table,
1076 Err(error) => {
1077 error!("Enrichment table \"{name}\" reload failed: {error}");
1078 continue;
1079 }
1080 };
1081
1082 if let Some(indexes) = indexes {
1083 for (case, index) in indexes {
1084 match table
1085 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
1086 {
1087 Ok(_) => (),
1088 Err(error) => {
1089 error!(
1093 message = "Unable to add index to reloaded enrichment table.",
1094 table = ?name.to_string(),
1095 %error
1096 );
1097 continue 'tables;
1098 }
1099 }
1100 }
1101 }
1102
1103 enrichment_tables.insert(table_name, table);
1104 }
1105 }
1106
1107 ENRICHMENT_TABLES.load(enrichment_tables);
1108 ENRICHMENT_TABLES.finish_load();
1109}
1110
1111pub struct TopologyPieces {
1112 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
1113 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
1114 pub(super) tasks: HashMap<ComponentKey, Task>,
1115 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
1116 pub(super) healthchecks: HashMap<ComponentKey, Task>,
1117 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
1118 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
1119 pub(crate) metrics_storage: MetricsStorage,
1120 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
1121}
1122
1123pub struct TopologyPiecesBuilder<'a> {
1135 config: &'a Config,
1136 diff: &'a ConfigDiff,
1137 buffers: HashMap<ComponentKey, BuiltBuffer>,
1138 extra_context: ExtraContext,
1139 utilization_registry: Option<UtilizationRegistry>,
1140}
1141
1142impl<'a> TopologyPiecesBuilder<'a> {
1143 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1145 Self {
1146 config,
1147 diff,
1148 buffers: HashMap::new(),
1149 extra_context: ExtraContext::default(),
1150 utilization_registry: None,
1151 }
1152 }
1153
1154 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1156 self.buffers = buffers;
1157 self
1158 }
1159
1160 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1162 self.extra_context = extra_context;
1163 self
1164 }
1165
1166 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1168 self.utilization_registry = registry;
1169 self
1170 }
1171
1172 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1177 Builder::new(
1178 self.config,
1179 self.diff,
1180 self.buffers,
1181 self.extra_context,
1182 self.utilization_registry,
1183 )
1184 .build()
1185 .await
1186 }
1187
1188 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1193 match self.build().await {
1194 Err(errors) => {
1195 for error in errors {
1196 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1197 }
1198 None
1199 }
1200 Ok(new_pieces) => Some(new_pieces),
1201 }
1202 }
1203}
1204
1205impl TopologyPieces {
1206 pub async fn build_or_log_errors(
1207 config: &Config,
1208 diff: &ConfigDiff,
1209 buffers: HashMap<ComponentKey, BuiltBuffer>,
1210 extra_context: ExtraContext,
1211 utilization_registry: Option<UtilizationRegistry>,
1212 ) -> Option<Self> {
1213 TopologyPiecesBuilder::new(config, diff)
1214 .with_buffers(buffers)
1215 .with_extra_context(extra_context)
1216 .with_utilization_registry(utilization_registry)
1217 .build_or_log_errors()
1218 .await
1219 }
1220
1221 pub async fn build(
1223 config: &super::Config,
1224 diff: &ConfigDiff,
1225 buffers: HashMap<ComponentKey, BuiltBuffer>,
1226 extra_context: ExtraContext,
1227 utilization_registry: Option<UtilizationRegistry>,
1228 ) -> Result<Self, Vec<String>> {
1229 TopologyPiecesBuilder::new(config, diff)
1230 .with_buffers(buffers)
1231 .with_extra_context(extra_context)
1232 .with_utilization_registry(utilization_registry)
1233 .build()
1234 .await
1235 }
1236}
1237
1238const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1239 match events {
1240 EventArray::Logs(_) => data_type.contains(DataType::Log),
1241 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1242 EventArray::Traces(_) => data_type.contains(DataType::Trace),
1243 }
1244}
1245
1246#[derive(Debug, Clone)]
1247struct TransformNode {
1248 key: ComponentKey,
1249 typetag: &'static str,
1250 inputs: Inputs<OutputId>,
1251 input_details: Input,
1252 outputs: Vec<TransformOutput>,
1253 enable_concurrency: bool,
1254 cpu_ns: Option<Counter>,
1255}
1256
1257impl TransformNode {
1258 pub fn from_parts(
1259 key: ComponentKey,
1260 context: &TransformContext,
1261 transform: &TransformOuter<OutputId>,
1262 schema_definition: &[(OutputId, Definition)],
1263 ) -> Self {
1264 Self {
1265 key,
1266 typetag: transform.inner.get_component_name(),
1267 inputs: transform.inputs.clone(),
1268 input_details: transform.inner.input(),
1269 outputs: transform.inner.outputs(context, schema_definition),
1270 enable_concurrency: transform.inner.enable_concurrency(),
1271 cpu_ns: context.cpu_ns.clone(),
1272 }
1273 }
1274}
1275
1276struct Runner {
1277 transform: Box<dyn SyncTransform>,
1278 input_rx: Option<BufferReceiver<EventArray>>,
1279 input_type: DataType,
1280 outputs: TransformOutputs,
1281 timer_tx: UtilizationComponentSender,
1282 latency_recorder: LatencyRecorder,
1283 events_received: Registered<EventsReceived>,
1284 cpu_ns: Option<Counter>,
1285}
1286
1287impl Runner {
1288 fn new(
1289 transform: Box<dyn SyncTransform>,
1290 input_rx: BufferReceiver<EventArray>,
1291 timer_tx: UtilizationComponentSender,
1292 input_type: DataType,
1293 outputs: TransformOutputs,
1294 latency_recorder: LatencyRecorder,
1295 cpu_ns: Option<Counter>,
1296 ) -> Self {
1297 Self {
1298 transform,
1299 input_rx: Some(input_rx),
1300 input_type,
1301 outputs,
1302 timer_tx,
1303 latency_recorder,
1304 events_received: register!(EventsReceived),
1305 cpu_ns,
1306 }
1307 }
1308
1309 fn on_events_received(&mut self, events: &EventArray) {
1310 self.timer_tx.try_send_stop_wait();
1311
1312 self.events_received.emit(CountByteSize(
1313 events.len(),
1314 events.estimated_json_encoded_size_of(),
1315 ));
1316 }
1317
1318 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1319 self.timer_tx.try_send_start_wait();
1320 let now = Instant::now();
1321 outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1322 self.outputs.send(outputs_buf).await
1323 }
1324
1325 async fn run_inline(mut self) -> TaskResult {
1326 const INLINE_BATCH_SIZE: usize = 128;
1328
1329 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1330
1331 let mut input_rx = self
1332 .input_rx
1333 .take()
1334 .expect("can't run runner twice")
1335 .into_stream()
1336 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1337
1338 self.timer_tx.try_send_start_wait();
1339 while let Some(events) = input_rx.next().await {
1340 self.on_events_received(&events);
1341 self.transform.transform_all(events, &mut outputs_buf);
1342 self.send_outputs(&mut outputs_buf)
1343 .await
1344 .map_err(TaskError::wrapped)?;
1345 }
1346
1347 Ok(TaskOutput::Transform)
1348 }
1349
1350 async fn run_concurrently(mut self) -> TaskResult {
1351 let input_rx = self
1352 .input_rx
1353 .take()
1354 .expect("can't run runner twice")
1355 .into_stream()
1356 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1357
1358 let mut input_rx =
1359 super::ready_arrays::ReadyArrays::with_capacity(input_rx, ready_array_capacity());
1360
1361 let mut in_flight = FuturesOrdered::new();
1362 let mut shutting_down = false;
1363
1364 self.timer_tx.try_send_start_wait();
1365 loop {
1366 tokio::select! {
1367 biased;
1368
1369 result = in_flight.next(), if !in_flight.is_empty() => {
1370 match result {
1371 Some(Ok(mut outputs_buf)) => {
1372 self.send_outputs(&mut outputs_buf).await
1373 .map_err(TaskError::wrapped)?;
1374 }
1375 _ => unreachable!("join error or bad poll"),
1376 }
1377 }
1378
1379 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1380 match input_arrays {
1381 Some(input_arrays) => {
1382 let mut len = 0;
1383 for events in &input_arrays {
1384 self.on_events_received(events);
1385 len += events.len();
1386 }
1387
1388 let mut t = self.transform.clone();
1389 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1390 let task = spawn_timed(
1394 async move {
1395 for events in input_arrays {
1396 t.transform_all(events, &mut outputs_buf);
1397 }
1398 outputs_buf
1399 },
1400 self.cpu_ns.clone(),
1401 );
1402 in_flight.push_back(task);
1403 }
1404 None => {
1405 shutting_down = true;
1406 continue
1407 }
1408 }
1409 }
1410
1411 else => {
1412 if shutting_down {
1413 break
1414 }
1415 }
1416 }
1417 }
1418
1419 Ok(TaskOutput::Transform)
1420 }
1421}