Skip to main content

enrichment/
get_enrichment_table_record.rs

1use std::collections::BTreeMap;
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7    Case, Condition, IndexHandle, TableRegistry, TableSearch,
8    vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11const PARAMETERS: &[Parameter] = &[
12    Parameter::required(
13        "table",
14        kind::BYTES,
15        "The [enrichment table](/docs/reference/glossary/#enrichment-tables) to search.",
16    ),
17    Parameter::required(
18        "condition",
19        kind::OBJECT,
20        "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
21    ),
22    Parameter::optional(
23        "select",
24        kind::ARRAY,
25        "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
26    ),
27    Parameter::optional(
28        "case_sensitive",
29        kind::BOOLEAN,
30        "Whether the text fields match the case exactly.",
31    )
32    .default(&DEFAULT_CASE_SENSITIVE),
33    Parameter::optional(
34        "wildcard",
35        kind::BYTES,
36        "Value to use for wildcard matching in the search.",
37    ),
38];
39
40fn get_enrichment_table_record(
41    select: Option<Value>,
42    enrichment_tables: &TableSearch,
43    table: &str,
44    case_sensitive: Case,
45    wildcard: Option<Value>,
46    condition: &[Condition],
47    index: Option<IndexHandle>,
48) -> Resolved {
49    let select = select
50        .map(|array| match array {
51            Value::Array(arr) => arr
52                .iter()
53                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
54                .collect::<std::result::Result<Vec<_>, _>>(),
55            value => Err(ValueError::Expected {
56                got: value.kind(),
57                expected: Kind::array(Collection::any()),
58            }),
59        })
60        .transpose()?;
61
62    let data = enrichment_tables.find_table_row(
63        table,
64        case_sensitive,
65        condition,
66        select.as_ref().map(|select| select.as_ref()),
67        wildcard.as_ref(),
68        index,
69    )?;
70
71    Ok(Value::Object(data))
72}
73
74#[derive(Clone, Copy, Debug)]
75pub struct GetEnrichmentTableRecord;
76impl Function for GetEnrichmentTableRecord {
77    fn identifier(&self) -> &'static str {
78        "get_enrichment_table_record"
79    }
80
81    fn usage(&self) -> &'static str {
82        const USAGE: &str = const_str::concat!(
83            "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for a row that matches the provided condition. A single row must be matched. If no rows are found or more than one row is found, an error is returned.\n\n",
84            super::ENRICHMENT_TABLE_EXPLAINER
85        );
86        USAGE
87    }
88
89    fn internal_failure_reasons(&self) -> &'static [&'static str] {
90        &[
91            "The row is not found.",
92            "Multiple rows are found that match the condition.",
93        ]
94    }
95
96    fn category(&self) -> &'static str {
97        Category::Enrichment.as_ref()
98    }
99
100    fn return_kind(&self) -> u16 {
101        kind::OBJECT
102    }
103
104    fn parameters(&self) -> &'static [Parameter] {
105        PARAMETERS
106    }
107
108    fn examples(&self) -> &'static [Example] {
109        &[
110            example! {
111                title: "Exact match",
112                source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
113                result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
114            },
115            example! {
116                title: "Case insensitive match",
117                source: indoc !{r#"
118                    get_enrichment_table_record!(
119                        "test",
120                        {"surname": "bob", "firstname": "John"},
121                        case_sensitive: false
122                    )
123                "#},
124                result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
125            },
126            example! {
127                title: "Date range search",
128                source: indoc! {r#"
129                    get_enrichment_table_record!(
130                        "test",
131                        {
132                            "surname": "Smith",
133                            "date_of_birth": {
134                                "from": t'1985-01-01T00:00:00Z',
135                                "to": t'1985-12-31T00:00:00Z'
136                            }
137                        }
138                    )
139                "#},
140                result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
141            },
142        ]
143    }
144
145    fn compile(
146        &self,
147        state: &TypeState,
148        ctx: &mut FunctionCompileContext,
149        arguments: ArgumentList,
150    ) -> Compiled {
151        let registry = ctx
152            .get_external_context_mut::<TableRegistry>()
153            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
154
155        let tables = registry
156            .table_ids()
157            .into_iter()
158            .map(Value::from)
159            .collect::<Vec<_>>();
160
161        let table = arguments
162            .required_enum("table", &tables, state)?
163            .try_bytes_utf8_lossy()
164            .expect("table is not valid utf8")
165            .into_owned();
166        let condition = arguments.required_object("condition")?;
167
168        let select = arguments.optional("select");
169
170        let case_sensitive = is_case_sensitive(&arguments, state)?;
171        let wildcard = arguments.optional("wildcard");
172        let index = Some(
173            add_index(registry, &table, case_sensitive, &condition)
174                .map_err(|err| Box::new(err) as Box<_>)?,
175        );
176
177        Ok(GetEnrichmentTableRecordFn {
178            table,
179            condition,
180            index,
181            select,
182            case_sensitive,
183            wildcard,
184            enrichment_tables: registry.as_readonly(),
185        }
186        .as_expr())
187    }
188}
189
190#[derive(Debug, Clone)]
191pub struct GetEnrichmentTableRecordFn {
192    table: String,
193    condition: BTreeMap<KeyString, expression::Expr>,
194    index: Option<IndexHandle>,
195    select: Option<Box<dyn Expression>>,
196    wildcard: Option<Box<dyn Expression>>,
197    case_sensitive: Case,
198    enrichment_tables: TableSearch,
199}
200
201impl FunctionExpression for GetEnrichmentTableRecordFn {
202    fn resolve(&self, ctx: &mut Context) -> Resolved {
203        let condition = self
204            .condition
205            .iter()
206            .map(|(key, value)| {
207                let value = value.resolve(ctx)?;
208                evaluate_condition(key, value)
209            })
210            .collect::<ExpressionResult<Vec<Condition>>>()?;
211
212        let select = self
213            .select
214            .as_ref()
215            .map(|array| array.resolve(ctx))
216            .transpose()?;
217
218        let table = &self.table;
219        let case_sensitive = self.case_sensitive;
220        let wildcard = self
221            .wildcard
222            .as_ref()
223            .map(|array| array.resolve(ctx))
224            .transpose()?;
225        let index = self.index;
226        let enrichment_tables = &self.enrichment_tables;
227
228        get_enrichment_table_record(
229            select,
230            enrichment_tables,
231            table,
232            case_sensitive,
233            wildcard,
234            &condition,
235            index,
236        )
237    }
238
239    fn type_def(&self, _: &TypeState) -> TypeDef {
240        TypeDef::object(Collection::any()).fallible()
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use vrl::{
247        compiler::{TargetValue, prelude::TimeZone, state::RuntimeState},
248        value,
249        value::Secrets,
250    };
251
252    use super::*;
253    use crate::test_util::get_table_registry;
254
255    #[test]
256    fn find_table_row() {
257        let registry = get_table_registry();
258        let func = GetEnrichmentTableRecordFn {
259            table: "dummy1".to_string(),
260            condition: BTreeMap::from([(
261                "field".into(),
262                expression::Literal::from("value").into(),
263            )]),
264            index: Some(IndexHandle(999)),
265            select: None,
266            case_sensitive: Case::Sensitive,
267            wildcard: None,
268            enrichment_tables: registry.as_readonly(),
269        };
270
271        let tz = TimeZone::default();
272        let object: Value = BTreeMap::new().into();
273        let mut target = TargetValue {
274            value: object,
275            metadata: value!({}),
276            secrets: Secrets::new(),
277        };
278        let mut runtime_state = RuntimeState::default();
279        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
280
281        registry.finish_load();
282
283        let got = func.resolve(&mut ctx);
284
285        assert_eq!(Ok(value! ({ "field": "result" })), got);
286    }
287}