Skip to main content

vector/config/
transform.rs

1use std::{
2    cell::RefCell,
3    collections::{HashMap, HashSet},
4    path::PathBuf,
5};
6
7use async_trait::async_trait;
8use dyn_clone::DynClone;
9use metrics::Counter;
10use serde::Serialize;
11use vector_lib::{
12    config::{GlobalOptions, Input, LogNamespace, TransformOutput},
13    configurable::{
14        Configurable, GenerateError, Metadata, NamedComponent,
15        attributes::CustomAttribute,
16        configurable_component,
17        schema::{SchemaGenerator, SchemaObject},
18    },
19    id::Inputs,
20    schema,
21    transform::Transform,
22};
23use vector_vrl_metrics::MetricsStorage;
24
25use super::{ComponentKey, OutputId, dot_graph::GraphConfig, schema::Options as SchemaOptions};
26use crate::extra_context::ExtraContext;
27
28pub type BoxedTransform = Box<dyn TransformConfig>;
29
30impl Configurable for BoxedTransform {
31    fn referenceable_name() -> Option<&'static str> {
32        Some("vector::transforms::Transforms")
33    }
34
35    fn metadata() -> Metadata {
36        let mut metadata = Metadata::default();
37        metadata.set_description("Configurable transforms in Vector.");
38        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
39        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
40        metadata
41    }
42
43    fn generate_schema(
44        generator: &RefCell<SchemaGenerator>,
45    ) -> Result<SchemaObject, GenerateError> {
46        vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
47    }
48}
49
50impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
51    fn from(that: T) -> Self {
52        Box::new(that)
53    }
54}
55
56/// Fully resolved transform component.
57#[configurable_component]
58#[configurable(metadata(docs::component_base_type = "transform"))]
59#[derive(Clone, Debug)]
60pub struct TransformOuter<T>
61where
62    T: Configurable + Serialize + 'static,
63{
64    #[configurable(derived)]
65    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
66    pub graph: GraphConfig,
67
68    #[configurable(derived)]
69    pub inputs: Inputs<T>,
70
71    /// Enable CPU usage metrics for this transform.
72    ///
73    /// When set to `true`, each poll of the transform task is timed using the OS thread CPU clock
74    /// and the accumulated nanoseconds are reported as the `component_cpu_usage_ns_total` counter,
75    /// tagged with `component_id`, `component_kind`, and `component_type`.
76    ///
77    /// Defaults to `false`. Enable only for transforms where CPU attribution is needed, as it
78    /// adds a `clock_gettime` call on every future poll.
79    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
80    pub measure_cpu_usage: bool,
81
82    #[configurable(metadata(docs::hidden))]
83    #[serde(flatten)]
84    pub inner: BoxedTransform,
85}
86
87impl<T> TransformOuter<T>
88where
89    T: Configurable + Serialize,
90{
91    pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
92    where
93        I: IntoIterator<Item = T>,
94        IT: Into<BoxedTransform>,
95    {
96        let inputs = Inputs::from_iter(inputs);
97        let inner = inner.into();
98        TransformOuter {
99            inputs,
100            inner,
101            graph: Default::default(),
102            measure_cpu_usage: false,
103        }
104    }
105
106    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
107    where
108        U: Configurable + Serialize,
109    {
110        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
111        self.with_inputs(inputs)
112    }
113
114    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
115    where
116        I: IntoIterator<Item = U>,
117        U: Configurable + Serialize,
118    {
119        TransformOuter {
120            inputs: Inputs::from_iter(inputs),
121            inner: self.inner,
122            graph: self.graph,
123            measure_cpu_usage: self.measure_cpu_usage,
124        }
125    }
126}
127
128pub struct TransformContext {
129    // This is optional because currently there are a lot of places we use `TransformContext` that
130    // may not have the relevant data available (e.g. tests). In the future it'd be nice to make it
131    // required somehow.
132    pub key: Option<ComponentKey>,
133
134    pub globals: GlobalOptions,
135
136    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
137
138    pub metrics_storage: MetricsStorage,
139
140    /// Tracks the schema IDs assigned to schemas exposed by the transform.
141    ///
142    /// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
143    /// that `TransformOutput`.
144    pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
145
146    /// The schema definition created by merging all inputs of the transform.
147    ///
148    /// This information can be used by transforms that behave differently based on schema
149    /// information, such as the `remap` transform, which passes this information along to the VRL
150    /// compiler such that type coercion becomes less of a need for operators writing VRL programs.
151    pub merged_schema_definition: schema::Definition,
152
153    pub schema: SchemaOptions,
154
155    /// Extra context data provided by the running app and shared across all components. This can be
156    /// used to pass shared settings or other data from outside the components.
157    pub extra_context: ExtraContext,
158
159    /// Counter handle for `component_cpu_usage_ns_total`, pre-tagged with this transform's
160    /// component identity. `Some` only when `measure_cpu_usage` is enabled on the
161    /// `TransformOuter`. Transforms that spawn helper tokio tasks at construction time
162    /// (e.g. `aws_ec2_metadata`, `throttle`) clone this and pass it to [`crate::cpu_time::spawn_timed`] so
163    /// their CPU is attributed to the component alongside the main transform task.
164    pub cpu_ns: Option<Counter>,
165}
166
167impl Default for TransformContext {
168    fn default() -> Self {
169        Self {
170            key: Default::default(),
171            globals: Default::default(),
172            enrichment_tables: Default::default(),
173            metrics_storage: Default::default(),
174            schema_definitions: HashMap::from([(None, HashMap::new())]),
175            merged_schema_definition: schema::Definition::any(),
176            schema: SchemaOptions::default(),
177            extra_context: Default::default(),
178            cpu_ns: None,
179        }
180    }
181}
182
183impl TransformContext {
184    // clippy allow avoids an issue where vrl is flagged off and `globals` is
185    // the sole field in the struct
186    #[allow(clippy::needless_update)]
187    pub fn new_with_globals(globals: GlobalOptions) -> Self {
188        Self {
189            globals,
190            ..Default::default()
191        }
192    }
193
194    #[cfg(test)]
195    pub fn new_test(
196        schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
197    ) -> Self {
198        Self {
199            schema_definitions,
200            ..Default::default()
201        }
202    }
203
204    /// Gets the log namespacing to use. The passed in value is from the transform itself
205    /// and will override any global default if it's set.
206    ///
207    /// This should only be used for transforms that don't originate from a log (eg: `metric_to_log`)
208    /// Most transforms will keep the log_namespace value that already exists on the event.
209    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
210        namespace
211            .or(self.schema.log_namespace)
212            .unwrap_or(false)
213            .into()
214    }
215}
216
217/// Generalized interface for describing and building transform components.
218#[async_trait]
219#[typetag::serde(tag = "type")]
220pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
221    /// Builds the transform with the given context.
222    ///
223    /// If the transform is built successfully, `Ok(...)` is returned containing the transform.
224    ///
225    /// # Errors
226    ///
227    /// If an error occurs while building the transform, an error variant explaining the issue is
228    /// returned.
229    async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
230
231    /// Gets the input configuration for this transform.
232    fn input(&self) -> Input;
233
234    /// Gets the list of outputs exposed by this transform.
235    ///
236    /// The provided `merged_definition` can be used by transforms to understand the expected shape
237    /// of events flowing through the transform.
238    fn outputs(
239        &self,
240        globals: &TransformContext,
241        input_definitions: &[(OutputId, schema::Definition)],
242    ) -> Vec<TransformOutput>;
243
244    /// Validates that the configuration of the transform is valid.
245    ///
246    /// This would generally be where logical conditions were checked, such as ensuring a transform
247    /// isn't using a named output that matches a reserved output name, and so on.
248    ///
249    /// # Errors
250    ///
251    /// If validation does not succeed, an error variant containing a list of all validation errors
252    /// is returned.
253    fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
254        Ok(())
255    }
256
257    /// Whether or not concurrency should be enabled for this transform.
258    ///
259    /// When enabled, this transform may be run in parallel in order to attempt to maximize
260    /// throughput for this node in the topology. Transforms should generally not run concurrently
261    /// unless they are compute-heavy, as there is a cost/overhead associated with fanning out
262    /// events to the parallel transform tasks.
263    fn enable_concurrency(&self) -> bool {
264        false
265    }
266
267    /// Whether or not this transform can be nested, given the types of transforms it would be
268    /// nested within.
269    ///
270    /// For some transforms, they can expand themselves into a subtopology of nested transforms.
271    /// However, in order to prevent an infinite recursion of nested transforms, we may want to only
272    /// allow one layer of "expansion". Additionally, there may be known issues with a transform
273    /// that is nested under another specific transform interacting poorly, or incorrectly.
274    ///
275    /// This method allows a transform to report if it can or cannot function correctly if it is
276    /// nested under transforms of a specific type, or if such nesting is fundamentally disallowed.
277    fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
278        true
279    }
280
281    /// Gets the files to watch to trigger reload
282    fn files_to_watch(&self) -> Vec<&PathBuf> {
283        Vec::new()
284    }
285}
286
287dyn_clone::clone_trait_object!(TransformConfig);
288
289/// Often we want to call outputs just to retrieve the OutputId's without needing
290/// the schema definitions.
291pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
292    transform: &T,
293    key: ComponentKey,
294    global_log_namespace: LogNamespace,
295) -> impl Iterator<Item = OutputId> + '_ {
296    transform
297        .outputs(
298            &TransformContext {
299                schema: SchemaOptions {
300                    log_namespace: Some(global_log_namespace.into()),
301                    ..Default::default()
302                },
303                ..Default::default()
304            },
305            &[(key.clone().into(), schema::Definition::any())],
306        )
307        .into_iter()
308        .map(move |output| OutputId {
309            component: key.clone(),
310            port: output.port,
311        })
312}