Skip to main content

vector/transforms/
route.rs

1use indexmap::IndexMap;
2use vector_lib::{
3    config::clone_input_definitions, configurable::configurable_component, transform::SyncTransform,
4};
5
6use crate::{
7    conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
8    config::{
9        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
10        TransformOutput,
11    },
12    event::Event,
13    schema,
14    transforms::Transform,
15};
16
17pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
18
19#[derive(Clone)]
20pub struct Route {
21    conditions: Vec<(String, Condition)>,
22    reroute_unmatched: bool,
23}
24
25impl Route {
26    pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
27        let mut conditions = Vec::with_capacity(config.route.len());
28        for (output_name, condition) in config.route.iter() {
29            let condition =
30                condition.build(&context.enrichment_tables, &context.metrics_storage)?;
31            conditions.push((output_name.clone(), condition));
32        }
33        Ok(Self {
34            conditions,
35            reroute_unmatched: config.reroute_unmatched,
36        })
37    }
38}
39
40impl SyncTransform for Route {
41    fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
42        let mut check_failed: usize = 0;
43        for (output_name, condition) in &self.conditions {
44            let (result, event) = condition.check(event.clone());
45            if result {
46                output.push(Some(output_name), event);
47            } else {
48                check_failed += 1;
49            }
50        }
51        if self.reroute_unmatched && check_failed == self.conditions.len() {
52            output.push(Some(UNMATCHED_ROUTE), event);
53        }
54    }
55}
56
57/// Configuration for the `route` transform.
58#[configurable_component(transform(
59    "route",
60    "Split a stream of events into multiple sub-streams based on user-supplied conditions."
61))]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct RouteConfig {
65    /// Reroutes unmatched events to a named output instead of silently discarding them.
66    ///
67    /// Normally, if an event doesn't match any defined route, it is sent to the `<transform_name>._unmatched`
68    /// output for further processing. In some cases, you may want to simply discard unmatched events and not
69    /// process them any further.
70    ///
71    /// In these cases, `reroute_unmatched` can be set to `false` to disable the `<transform_name>._unmatched`
72    /// output and instead silently discard any unmatched events.
73    #[serde(default = "crate::serde::default_true")]
74    #[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
75    reroute_unmatched: bool,
76
77    /// A map from route identifiers to logical conditions.
78    /// Each condition represents a filter which is applied to each event.
79    ///
80    /// The following identifiers are reserved output names and thus cannot be used as route IDs:
81    /// - `_unmatched`
82    /// - `_default`
83    ///
84    /// Each route can then be referenced as an input by other components with the name
85    /// `<transform_name>.<route_id>`. If an event doesn’t match any route, and if `reroute_unmatched`
86    /// is set to `true` (the default), it is sent to the `<transform_name>._unmatched` output.
87    /// Otherwise, the unmatched event is instead silently discarded.
88    #[configurable(metadata(docs::additional_props_description = "An individual route."))]
89    #[configurable(metadata(docs::examples = "route_examples()"))]
90    route: IndexMap<String, AnyCondition>,
91}
92
93fn route_examples() -> IndexMap<String, AnyCondition> {
94    IndexMap::from([
95        (
96            "foo-exists".to_owned(),
97            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
98                source: "exists(.foo)".to_owned(),
99                ..Default::default()
100            })),
101        ),
102        (
103            "foo-does-not-exist".to_owned(),
104            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
105                source: "!exists(.foo)".to_owned(),
106                ..Default::default()
107            })),
108        ),
109    ])
110}
111
112impl GenerateConfig for RouteConfig {
113    fn generate_config() -> toml::Value {
114        toml::Value::try_from(Self {
115            reroute_unmatched: true,
116            route: route_examples(),
117        })
118        .unwrap()
119    }
120}
121
122#[async_trait::async_trait]
123#[typetag::serde(name = "route")]
124impl TransformConfig for RouteConfig {
125    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
126        let route = Route::new(self, context)?;
127        Ok(Transform::synchronous(route))
128    }
129
130    fn input(&self) -> Input {
131        Input::all()
132    }
133
134    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
135        if self.route.contains_key(UNMATCHED_ROUTE) {
136            Err(vec![format!(
137                "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
138            )])
139        } else {
140            Ok(())
141        }
142    }
143
144    fn outputs(
145        &self,
146        _: &TransformContext,
147        input_definitions: &[(OutputId, schema::Definition)],
148    ) -> Vec<TransformOutput> {
149        let mut result: Vec<TransformOutput> = self
150            .route
151            .keys()
152            .map(|output_name| {
153                TransformOutput::new(
154                    DataType::all_bits(),
155                    clone_input_definitions(input_definitions),
156                )
157                .with_port(output_name)
158            })
159            .collect();
160        if self.reroute_unmatched {
161            result.push(
162                TransformOutput::new(
163                    DataType::all_bits(),
164                    clone_input_definitions(input_definitions),
165                )
166                .with_port(UNMATCHED_ROUTE),
167            );
168        }
169        result
170    }
171
172    fn enable_concurrency(&self) -> bool {
173        true
174    }
175}
176
177#[cfg(test)]
178mod test {
179    use std::collections::HashMap;
180
181    use indoc::indoc;
182    use vector_lib::{config::LogNamespace, transform::TransformOutputsBuf};
183
184    use super::*;
185    use crate::{
186        config::{ConfigBuilder, build_unit_tests},
187        test_util::components::{COMPONENT_MULTIPLE_OUTPUTS_TESTS, init_test},
188    };
189
190    #[test]
191    fn generate_config() {
192        crate::test_util::test_generate_config::<super::RouteConfig>();
193    }
194
195    #[test]
196    fn can_serialize_remap() {
197        // We need to serialize the config to check if a config has
198        // changed when reloading.
199        let config = serde_yaml::from_str::<RouteConfig>(indoc! {"
200            route:
201              first:
202                type: vrl
203                source: '.message == \"hello world\"'
204        "})
205        .unwrap();
206
207        assert_eq!(
208            serde_json::to_string(&config).unwrap(),
209            r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
210        );
211    }
212
213    #[test]
214    fn route_pass_all_route_conditions() {
215        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
216        let event = Event::from_json_value(
217            serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
218            LogNamespace::Legacy,
219        )
220        .unwrap();
221        let config = serde_yaml::from_str::<RouteConfig>(indoc! {"
222            route:
223              first:
224                type: vrl
225                source: '.message == \"hello world\"'
226              second:
227                type: vrl
228                source: '.second == \"second\"'
229              third:
230                type: vrl
231                source: '.third == \"third\"'
232        "})
233        .unwrap();
234
235        let mut transform = Route::new(&config, &Default::default()).unwrap();
236        let mut outputs = TransformOutputsBuf::new_with_capacity(
237            output_names
238                .iter()
239                .map(|output_name| {
240                    TransformOutput::new(DataType::all_bits(), HashMap::new())
241                        .with_port(output_name.to_owned())
242                })
243                .collect(),
244            1,
245        );
246
247        transform.transform(event.clone(), &mut outputs);
248        for output_name in output_names {
249            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
250            if output_name == UNMATCHED_ROUTE {
251                assert!(events.is_empty());
252            } else {
253                assert_eq!(events.len(), 1);
254                assert_eq!(events.pop().unwrap(), event);
255            }
256        }
257    }
258
259    #[test]
260    fn route_pass_one_route_condition() {
261        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
262        let event = Event::from_json_value(
263            serde_json::json!({"message": "hello world"}),
264            LogNamespace::Legacy,
265        )
266        .unwrap();
267        let config = serde_yaml::from_str::<RouteConfig>(indoc! {"
268            route:
269              first:
270                type: vrl
271                source: '.message == \"hello world\"'
272              second:
273                type: vrl
274                source: '.second == \"second\"'
275              third:
276                type: vrl
277                source: '.third == \"third\"'
278        "})
279        .unwrap();
280
281        let mut transform = Route::new(&config, &Default::default()).unwrap();
282        let mut outputs = TransformOutputsBuf::new_with_capacity(
283            output_names
284                .iter()
285                .map(|output_name| {
286                    TransformOutput::new(DataType::all_bits(), HashMap::new())
287                        .with_port(output_name.to_owned())
288                })
289                .collect(),
290            1,
291        );
292
293        transform.transform(event.clone(), &mut outputs);
294        for output_name in output_names {
295            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
296            if output_name == "first" {
297                assert_eq!(events.len(), 1);
298                assert_eq!(events.pop().unwrap(), event);
299            }
300            assert_eq!(events.len(), 0);
301        }
302    }
303
304    #[test]
305    fn route_pass_no_route_condition() {
306        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
307        let event =
308            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
309                .unwrap();
310        let config = serde_yaml::from_str::<RouteConfig>(indoc! {"
311            route:
312              first:
313                type: vrl
314                source: '.message == \"hello world\"'
315              second:
316                type: vrl
317                source: '.second == \"second\"'
318              third:
319                type: vrl
320                source: '.third == \"third\"'
321        "})
322        .unwrap();
323
324        let mut transform = Route::new(&config, &Default::default()).unwrap();
325        let mut outputs = TransformOutputsBuf::new_with_capacity(
326            output_names
327                .iter()
328                .map(|output_name| {
329                    TransformOutput::new(DataType::all_bits(), HashMap::new())
330                        .with_port(output_name.to_owned())
331                })
332                .collect(),
333            1,
334        );
335
336        transform.transform(event.clone(), &mut outputs);
337        for output_name in output_names {
338            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
339            if output_name == UNMATCHED_ROUTE {
340                assert_eq!(events.len(), 1);
341                assert_eq!(events.pop().unwrap(), event);
342            }
343            assert_eq!(events.len(), 0);
344        }
345    }
346
347    #[test]
348    fn route_no_unmatched_output() {
349        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
350        let event =
351            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
352                .unwrap();
353        let config = serde_yaml::from_str::<RouteConfig>(indoc! {"
354            reroute_unmatched: false
355            route:
356              first:
357                type: vrl
358                source: '.message == \"hello world\"'
359              second:
360                type: vrl
361                source: '.second == \"second\"'
362              third:
363                type: vrl
364                source: '.third == \"third\"'
365        "})
366        .unwrap();
367
368        let mut transform = Route::new(&config, &Default::default()).unwrap();
369        let mut outputs = TransformOutputsBuf::new_with_capacity(
370            output_names
371                .iter()
372                .map(|output_name| {
373                    TransformOutput::new(DataType::all_bits(), HashMap::new())
374                        .with_port(output_name.to_owned())
375                })
376                .collect(),
377            1,
378        );
379
380        transform.transform(event.clone(), &mut outputs);
381        for output_name in output_names {
382            let events: Vec<_> = outputs.drain_named(output_name).collect();
383            assert_eq!(events.len(), 0);
384        }
385    }
386
387    #[tokio::test]
388    async fn route_metrics_with_output_tag() {
389        init_test();
390
391        let config: ConfigBuilder = serde_yaml::from_str(indoc! {"
392            transforms:
393              foo:
394                inputs: []
395                type: route
396                route:
397                  first:
398                    type: is_log
399            tests:
400              - name: metric output
401                input:
402                  insert_at: foo
403                  value: none
404                outputs:
405                  - extract_from: foo.first
406                    conditions:
407                      - type: vrl
408                        source: \"true\"
409        "})
410        .unwrap();
411
412        let mut tests = build_unit_tests(config).await.unwrap();
413        assert!(tests.remove(0).run().await.errors.is_empty());
414        // Check that metrics were emitted with output tag
415        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
416    }
417}