1use std::{
2 cell::RefCell,
3 collections::{HashMap, HashSet},
4 path::PathBuf,
5};
6
7use async_trait::async_trait;
8use dyn_clone::DynClone;
9use metrics::Counter;
10use serde::Serialize;
11use vector_lib::{
12 config::{GlobalOptions, Input, LogNamespace, TransformOutput},
13 configurable::{
14 Configurable, GenerateError, Metadata, NamedComponent,
15 attributes::CustomAttribute,
16 configurable_component,
17 schema::{SchemaGenerator, SchemaObject},
18 },
19 id::Inputs,
20 schema,
21 transform::Transform,
22};
23use vector_vrl_metrics::MetricsStorage;
24
25use super::{ComponentKey, OutputId, dot_graph::GraphConfig, schema::Options as SchemaOptions};
26use crate::extra_context::ExtraContext;
27
28pub type BoxedTransform = Box<dyn TransformConfig>;
29
30impl Configurable for BoxedTransform {
31 fn referenceable_name() -> Option<&'static str> {
32 Some("vector::transforms::Transforms")
33 }
34
35 fn metadata() -> Metadata {
36 let mut metadata = Metadata::default();
37 metadata.set_description("Configurable transforms in Vector.");
38 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
39 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
40 metadata
41 }
42
43 fn generate_schema(
44 generator: &RefCell<SchemaGenerator>,
45 ) -> Result<SchemaObject, GenerateError> {
46 vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
47 }
48}
49
50impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
51 fn from(that: T) -> Self {
52 Box::new(that)
53 }
54}
55
56#[configurable_component]
58#[configurable(metadata(docs::component_base_type = "transform"))]
59#[derive(Clone, Debug)]
60pub struct TransformOuter<T>
61where
62 T: Configurable + Serialize + 'static,
63{
64 #[configurable(derived)]
65 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
66 pub graph: GraphConfig,
67
68 #[configurable(derived)]
69 pub inputs: Inputs<T>,
70
71 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
80 pub measure_cpu_usage: bool,
81
82 #[configurable(metadata(docs::hidden))]
83 #[serde(flatten)]
84 pub inner: BoxedTransform,
85}
86
87impl<T> TransformOuter<T>
88where
89 T: Configurable + Serialize,
90{
91 pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
92 where
93 I: IntoIterator<Item = T>,
94 IT: Into<BoxedTransform>,
95 {
96 let inputs = Inputs::from_iter(inputs);
97 let inner = inner.into();
98 TransformOuter {
99 inputs,
100 inner,
101 graph: Default::default(),
102 measure_cpu_usage: false,
103 }
104 }
105
106 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
107 where
108 U: Configurable + Serialize,
109 {
110 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
111 self.with_inputs(inputs)
112 }
113
114 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
115 where
116 I: IntoIterator<Item = U>,
117 U: Configurable + Serialize,
118 {
119 TransformOuter {
120 inputs: Inputs::from_iter(inputs),
121 inner: self.inner,
122 graph: self.graph,
123 measure_cpu_usage: self.measure_cpu_usage,
124 }
125 }
126}
127
128pub struct TransformContext {
129 pub key: Option<ComponentKey>,
133
134 pub globals: GlobalOptions,
135
136 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
137
138 pub metrics_storage: MetricsStorage,
139
140 pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
145
146 pub merged_schema_definition: schema::Definition,
152
153 pub schema: SchemaOptions,
154
155 pub extra_context: ExtraContext,
158
159 pub cpu_ns: Option<Counter>,
165}
166
167impl Default for TransformContext {
168 fn default() -> Self {
169 Self {
170 key: Default::default(),
171 globals: Default::default(),
172 enrichment_tables: Default::default(),
173 metrics_storage: Default::default(),
174 schema_definitions: HashMap::from([(None, HashMap::new())]),
175 merged_schema_definition: schema::Definition::any(),
176 schema: SchemaOptions::default(),
177 extra_context: Default::default(),
178 cpu_ns: None,
179 }
180 }
181}
182
183impl TransformContext {
184 #[allow(clippy::needless_update)]
187 pub fn new_with_globals(globals: GlobalOptions) -> Self {
188 Self {
189 globals,
190 ..Default::default()
191 }
192 }
193
194 #[cfg(test)]
195 pub fn new_test(
196 schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
197 ) -> Self {
198 Self {
199 schema_definitions,
200 ..Default::default()
201 }
202 }
203
204 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
210 namespace
211 .or(self.schema.log_namespace)
212 .unwrap_or(false)
213 .into()
214 }
215}
216
217#[async_trait]
219#[typetag::serde(tag = "type")]
220pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
221 async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
230
231 fn input(&self) -> Input;
233
234 fn outputs(
239 &self,
240 globals: &TransformContext,
241 input_definitions: &[(OutputId, schema::Definition)],
242 ) -> Vec<TransformOutput>;
243
244 fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
254 Ok(())
255 }
256
257 fn enable_concurrency(&self) -> bool {
264 false
265 }
266
267 fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
278 true
279 }
280
281 fn files_to_watch(&self) -> Vec<&PathBuf> {
283 Vec::new()
284 }
285}
286
287dyn_clone::clone_trait_object!(TransformConfig);
288
289pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
292 transform: &T,
293 key: ComponentKey,
294 global_log_namespace: LogNamespace,
295) -> impl Iterator<Item = OutputId> + '_ {
296 transform
297 .outputs(
298 &TransformContext {
299 schema: SchemaOptions {
300 log_namespace: Some(global_log_namespace.into()),
301 ..Default::default()
302 },
303 ..Default::default()
304 },
305 &[(key.clone().into(), schema::Definition::any())],
306 )
307 .into_iter()
308 .map(move |output| OutputId {
309 component: key.clone(),
310 port: output.port,
311 })
312}