Skip to main content

enrichment/
find_enrichment_table_records.rs

1use std::collections::BTreeMap;
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7    Case, Condition, IndexHandle, TableRegistry, TableSearch,
8    vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11const PARAMETERS: &[Parameter] = &[
12    Parameter::required(
13        "table",
14        kind::BYTES,
15        "The [enrichment table](/docs/reference/glossary/#enrichment-tables) to search.",
16    ),
17    Parameter::required(
18        "condition",
19        kind::OBJECT,
20        "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
21    ),
22    Parameter::optional(
23        "select",
24        kind::ARRAY,
25        "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
26    ),
27    Parameter::optional(
28        "case_sensitive",
29        kind::BOOLEAN,
30        "Whether text fields need to match cases exactly.",
31    )
32    .default(&DEFAULT_CASE_SENSITIVE),
33    Parameter::optional(
34        "wildcard",
35        kind::BYTES,
36        "Value to use for wildcard matching in the search.",
37    ),
38];
39
40fn find_enrichment_table_records(
41    select: Option<Value>,
42    enrichment_tables: &TableSearch,
43    table: &str,
44    case_sensitive: Case,
45    wildcard: Option<Value>,
46    condition: &[Condition],
47    index: Option<IndexHandle>,
48) -> Resolved {
49    let select = select
50        .map(|select| match select {
51            Value::Array(arr) => arr
52                .iter()
53                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
54                .collect::<std::result::Result<Vec<_>, _>>(),
55            value => Err(ValueError::Expected {
56                got: value.kind(),
57                expected: Kind::array(Collection::any()),
58            }),
59        })
60        .transpose()?;
61
62    let data = enrichment_tables
63        .find_table_rows(
64            table,
65            case_sensitive,
66            condition,
67            select.as_ref().map(|select| select.as_ref()),
68            wildcard.as_ref(),
69            index,
70        )?
71        .into_iter()
72        .map(Value::Object)
73        .collect();
74    Ok(Value::Array(data))
75}
76
77#[derive(Clone, Copy, Debug)]
78pub struct FindEnrichmentTableRecords;
79impl Function for FindEnrichmentTableRecords {
80    fn identifier(&self) -> &'static str {
81        "find_enrichment_table_records"
82    }
83
84    fn usage(&self) -> &'static str {
85        const_str::concat!(
86            "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for rows that match the provided condition.\n\n",
87            super::ENRICHMENT_TABLE_EXPLAINER
88        )
89    }
90
91    fn category(&self) -> &'static str {
92        Category::Enrichment.as_ref()
93    }
94
95    fn return_kind(&self) -> u16 {
96        kind::ARRAY
97    }
98
99    fn parameters(&self) -> &'static [Parameter] {
100        PARAMETERS
101    }
102
103    fn examples(&self) -> &'static [Example] {
104        const RESULT: Result<&'static str, &'static str> = Ok(indoc! {r#"
105            [{"id": 1, "firstname": "Bob", "surname": "Smith"},
106             {"id": 2, "firstname": "Fred", "surname": "Smith"}]
107        "#});
108
109        const EXAMPLES: &[Example] = &[
110            example! {
111                title: "Exact match",
112                source: indoc! {r#"
113                    find_enrichment_table_records!(
114                        "test",
115                        {"surname": "Smith"}
116                    )
117                "#},
118                result: RESULT,
119            },
120            example! {
121                title: "Case insensitive match",
122                source: indoc! {r#"
123                    find_enrichment_table_records!(
124                        "test",
125                        {"surname": "smith"},
126                        case_sensitive: false
127                    )
128                "#},
129                result: RESULT,
130            },
131            example! {
132                title: "Wildcard match",
133                source: indoc! {r#"
134                    find_enrichment_table_records!(
135                        "test",
136                        {"firstname": "Bob"},
137                        wildcard: "fred",
138                        case_sensitive: false
139                    )
140                "#},
141                result: RESULT,
142            },
143            example! {
144                title: "Date range search",
145                source: indoc! {r#"
146                    find_enrichment_table_records!(
147                        "test",
148                        {
149                            "surname": "Smith",
150                            "date_of_birth": {
151                                "from": t'1985-01-01T00:00:00Z',
152                                "to": t'1985-12-31T00:00:00Z'
153                            }
154                        }
155                    )
156                "#},
157                result: RESULT,
158            },
159        ];
160
161        EXAMPLES
162    }
163
164    fn compile(
165        &self,
166        state: &TypeState,
167        ctx: &mut FunctionCompileContext,
168        arguments: ArgumentList,
169    ) -> Compiled {
170        let registry = ctx
171            .get_external_context_mut::<TableRegistry>()
172            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
173
174        let tables = registry
175            .table_ids()
176            .into_iter()
177            .map(Value::from)
178            .collect::<Vec<_>>();
179
180        let table = arguments
181            .required_enum("table", &tables, state)?
182            .try_bytes_utf8_lossy()
183            .expect("table is not valid utf8")
184            .into_owned();
185        let condition = arguments.required_object("condition")?;
186
187        let select = arguments.optional("select");
188
189        let case_sensitive = is_case_sensitive(&arguments, state)?;
190        let wildcard = arguments.optional("wildcard");
191        let index = Some(
192            add_index(registry, &table, case_sensitive, &condition)
193                .map_err(|err| Box::new(err) as Box<_>)?,
194        );
195
196        Ok(FindEnrichmentTableRecordsFn {
197            table,
198            condition,
199            index,
200            select,
201            case_sensitive,
202            wildcard,
203            enrichment_tables: registry.as_readonly(),
204        }
205        .as_expr())
206    }
207}
208
209#[derive(Debug, Clone)]
210pub struct FindEnrichmentTableRecordsFn {
211    table: String,
212    condition: BTreeMap<KeyString, expression::Expr>,
213    index: Option<IndexHandle>,
214    select: Option<Box<dyn Expression>>,
215    case_sensitive: Case,
216    wildcard: Option<Box<dyn Expression>>,
217    enrichment_tables: TableSearch,
218}
219
220impl FunctionExpression for FindEnrichmentTableRecordsFn {
221    fn resolve(&self, ctx: &mut Context) -> Resolved {
222        let condition = self
223            .condition
224            .iter()
225            .map(|(key, value)| {
226                let value = value.resolve(ctx)?;
227                evaluate_condition(key, value)
228            })
229            .collect::<ExpressionResult<Vec<Condition>>>()?;
230
231        let select = self
232            .select
233            .as_ref()
234            .map(|array| array.resolve(ctx))
235            .transpose()?;
236
237        let table = &self.table;
238        let case_sensitive = self.case_sensitive;
239        let wildcard = self
240            .wildcard
241            .as_ref()
242            .map(|array| array.resolve(ctx))
243            .transpose()?;
244        let index = self.index;
245        let enrichment_tables = &self.enrichment_tables;
246
247        find_enrichment_table_records(
248            select,
249            enrichment_tables,
250            table,
251            case_sensitive,
252            wildcard,
253            &condition,
254            index,
255        )
256    }
257
258    fn type_def(&self, _: &TypeState) -> TypeDef {
259        TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use vrl::{
266        compiler::{TargetValue, TimeZone, state::RuntimeState},
267        value,
268        value::Secrets,
269    };
270
271    use super::*;
272    use crate::test_util::get_table_registry;
273
274    #[test]
275    fn find_table_row() {
276        let registry = get_table_registry();
277        let func = FindEnrichmentTableRecordsFn {
278            table: "dummy1".to_string(),
279            condition: BTreeMap::from([(
280                "field".into(),
281                expression::Literal::from("value").into(),
282            )]),
283            index: Some(IndexHandle(999)),
284            select: None,
285            case_sensitive: Case::Sensitive,
286            wildcard: None,
287            enrichment_tables: registry.as_readonly(),
288        };
289
290        let tz = TimeZone::default();
291        let object: Value = ObjectMap::new().into();
292        let mut target = TargetValue {
293            value: object,
294            metadata: value!({}),
295            secrets: Secrets::new(),
296        };
297        let mut runtime_state = RuntimeState::default();
298        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
299
300        registry.finish_load();
301
302        let got = func.resolve(&mut ctx);
303
304        assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
305    }
306}