1use std::collections::BTreeMap;
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7 Case, Condition, IndexHandle, TableRegistry, TableSearch,
8 vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11const PARAMETERS: &[Parameter] = &[
12 Parameter::required(
13 "table",
14 kind::BYTES,
15 "The [enrichment table](/docs/reference/glossary/#enrichment-tables) to search.",
16 ),
17 Parameter::required(
18 "condition",
19 kind::OBJECT,
20 "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
21 ),
22 Parameter::optional(
23 "select",
24 kind::ARRAY,
25 "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
26 ),
27 Parameter::optional(
28 "case_sensitive",
29 kind::BOOLEAN,
30 "Whether text fields need to match cases exactly.",
31 )
32 .default(&DEFAULT_CASE_SENSITIVE),
33 Parameter::optional(
34 "wildcard",
35 kind::BYTES,
36 "Value to use for wildcard matching in the search.",
37 ),
38];
39
40fn find_enrichment_table_records(
41 select: Option<Value>,
42 enrichment_tables: &TableSearch,
43 table: &str,
44 case_sensitive: Case,
45 wildcard: Option<Value>,
46 condition: &[Condition],
47 index: Option<IndexHandle>,
48) -> Resolved {
49 let select = select
50 .map(|select| match select {
51 Value::Array(arr) => arr
52 .iter()
53 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
54 .collect::<std::result::Result<Vec<_>, _>>(),
55 value => Err(ValueError::Expected {
56 got: value.kind(),
57 expected: Kind::array(Collection::any()),
58 }),
59 })
60 .transpose()?;
61
62 let data = enrichment_tables
63 .find_table_rows(
64 table,
65 case_sensitive,
66 condition,
67 select.as_ref().map(|select| select.as_ref()),
68 wildcard.as_ref(),
69 index,
70 )?
71 .into_iter()
72 .map(Value::Object)
73 .collect();
74 Ok(Value::Array(data))
75}
76
77#[derive(Clone, Copy, Debug)]
78pub struct FindEnrichmentTableRecords;
79impl Function for FindEnrichmentTableRecords {
80 fn identifier(&self) -> &'static str {
81 "find_enrichment_table_records"
82 }
83
84 fn usage(&self) -> &'static str {
85 const_str::concat!(
86 "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for rows that match the provided condition.\n\n",
87 super::ENRICHMENT_TABLE_EXPLAINER
88 )
89 }
90
91 fn category(&self) -> &'static str {
92 Category::Enrichment.as_ref()
93 }
94
95 fn return_kind(&self) -> u16 {
96 kind::ARRAY
97 }
98
99 fn parameters(&self) -> &'static [Parameter] {
100 PARAMETERS
101 }
102
103 fn examples(&self) -> &'static [Example] {
104 const RESULT: Result<&'static str, &'static str> = Ok(indoc! {r#"
105 [{"id": 1, "firstname": "Bob", "surname": "Smith"},
106 {"id": 2, "firstname": "Fred", "surname": "Smith"}]
107 "#});
108
109 const EXAMPLES: &[Example] = &[
110 example! {
111 title: "Exact match",
112 source: indoc! {r#"
113 find_enrichment_table_records!(
114 "test",
115 {"surname": "Smith"}
116 )
117 "#},
118 result: RESULT,
119 },
120 example! {
121 title: "Case insensitive match",
122 source: indoc! {r#"
123 find_enrichment_table_records!(
124 "test",
125 {"surname": "smith"},
126 case_sensitive: false
127 )
128 "#},
129 result: RESULT,
130 },
131 example! {
132 title: "Wildcard match",
133 source: indoc! {r#"
134 find_enrichment_table_records!(
135 "test",
136 {"firstname": "Bob"},
137 wildcard: "fred",
138 case_sensitive: false
139 )
140 "#},
141 result: RESULT,
142 },
143 example! {
144 title: "Date range search",
145 source: indoc! {r#"
146 find_enrichment_table_records!(
147 "test",
148 {
149 "surname": "Smith",
150 "date_of_birth": {
151 "from": t'1985-01-01T00:00:00Z',
152 "to": t'1985-12-31T00:00:00Z'
153 }
154 }
155 )
156 "#},
157 result: RESULT,
158 },
159 ];
160
161 EXAMPLES
162 }
163
164 fn compile(
165 &self,
166 state: &TypeState,
167 ctx: &mut FunctionCompileContext,
168 arguments: ArgumentList,
169 ) -> Compiled {
170 let registry = ctx
171 .get_external_context_mut::<TableRegistry>()
172 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
173
174 let tables = registry
175 .table_ids()
176 .into_iter()
177 .map(Value::from)
178 .collect::<Vec<_>>();
179
180 let table = arguments
181 .required_enum("table", &tables, state)?
182 .try_bytes_utf8_lossy()
183 .expect("table is not valid utf8")
184 .into_owned();
185 let condition = arguments.required_object("condition")?;
186
187 let select = arguments.optional("select");
188
189 let case_sensitive = is_case_sensitive(&arguments, state)?;
190 let wildcard = arguments.optional("wildcard");
191 let index = Some(
192 add_index(registry, &table, case_sensitive, &condition)
193 .map_err(|err| Box::new(err) as Box<_>)?,
194 );
195
196 Ok(FindEnrichmentTableRecordsFn {
197 table,
198 condition,
199 index,
200 select,
201 case_sensitive,
202 wildcard,
203 enrichment_tables: registry.as_readonly(),
204 }
205 .as_expr())
206 }
207}
208
209#[derive(Debug, Clone)]
210pub struct FindEnrichmentTableRecordsFn {
211 table: String,
212 condition: BTreeMap<KeyString, expression::Expr>,
213 index: Option<IndexHandle>,
214 select: Option<Box<dyn Expression>>,
215 case_sensitive: Case,
216 wildcard: Option<Box<dyn Expression>>,
217 enrichment_tables: TableSearch,
218}
219
220impl FunctionExpression for FindEnrichmentTableRecordsFn {
221 fn resolve(&self, ctx: &mut Context) -> Resolved {
222 let condition = self
223 .condition
224 .iter()
225 .map(|(key, value)| {
226 let value = value.resolve(ctx)?;
227 evaluate_condition(key, value)
228 })
229 .collect::<ExpressionResult<Vec<Condition>>>()?;
230
231 let select = self
232 .select
233 .as_ref()
234 .map(|array| array.resolve(ctx))
235 .transpose()?;
236
237 let table = &self.table;
238 let case_sensitive = self.case_sensitive;
239 let wildcard = self
240 .wildcard
241 .as_ref()
242 .map(|array| array.resolve(ctx))
243 .transpose()?;
244 let index = self.index;
245 let enrichment_tables = &self.enrichment_tables;
246
247 find_enrichment_table_records(
248 select,
249 enrichment_tables,
250 table,
251 case_sensitive,
252 wildcard,
253 &condition,
254 index,
255 )
256 }
257
258 fn type_def(&self, _: &TypeState) -> TypeDef {
259 TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use vrl::{
266 compiler::{TargetValue, TimeZone, state::RuntimeState},
267 value,
268 value::Secrets,
269 };
270
271 use super::*;
272 use crate::test_util::get_table_registry;
273
274 #[test]
275 fn find_table_row() {
276 let registry = get_table_registry();
277 let func = FindEnrichmentTableRecordsFn {
278 table: "dummy1".to_string(),
279 condition: BTreeMap::from([(
280 "field".into(),
281 expression::Literal::from("value").into(),
282 )]),
283 index: Some(IndexHandle(999)),
284 select: None,
285 case_sensitive: Case::Sensitive,
286 wildcard: None,
287 enrichment_tables: registry.as_readonly(),
288 };
289
290 let tz = TimeZone::default();
291 let object: Value = ObjectMap::new().into();
292 let mut target = TargetValue {
293 value: object,
294 metadata: value!({}),
295 secrets: Secrets::new(),
296 };
297 let mut runtime_state = RuntimeState::default();
298 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
299
300 registry.finish_load();
301
302 let got = func.resolve(&mut ctx);
303
304 assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
305 }
306}