1use std::collections::BTreeMap;
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7 Case, Condition, IndexHandle, TableRegistry, TableSearch,
8 vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11const PARAMETERS: &[Parameter] = &[
12 Parameter::required(
13 "table",
14 kind::BYTES,
15 "The [enrichment table](/docs/reference/glossary/#enrichment-tables) to search.",
16 ),
17 Parameter::required(
18 "condition",
19 kind::OBJECT,
20 "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
21 ),
22 Parameter::optional(
23 "select",
24 kind::ARRAY,
25 "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
26 ),
27 Parameter::optional(
28 "case_sensitive",
29 kind::BOOLEAN,
30 "Whether the text fields match the case exactly.",
31 )
32 .default(&DEFAULT_CASE_SENSITIVE),
33 Parameter::optional(
34 "wildcard",
35 kind::BYTES,
36 "Value to use for wildcard matching in the search.",
37 ),
38];
39
40fn get_enrichment_table_record(
41 select: Option<Value>,
42 enrichment_tables: &TableSearch,
43 table: &str,
44 case_sensitive: Case,
45 wildcard: Option<Value>,
46 condition: &[Condition],
47 index: Option<IndexHandle>,
48) -> Resolved {
49 let select = select
50 .map(|array| match array {
51 Value::Array(arr) => arr
52 .iter()
53 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
54 .collect::<std::result::Result<Vec<_>, _>>(),
55 value => Err(ValueError::Expected {
56 got: value.kind(),
57 expected: Kind::array(Collection::any()),
58 }),
59 })
60 .transpose()?;
61
62 let data = enrichment_tables.find_table_row(
63 table,
64 case_sensitive,
65 condition,
66 select.as_ref().map(|select| select.as_ref()),
67 wildcard.as_ref(),
68 index,
69 )?;
70
71 Ok(Value::Object(data))
72}
73
74#[derive(Clone, Copy, Debug)]
75pub struct GetEnrichmentTableRecord;
76impl Function for GetEnrichmentTableRecord {
77 fn identifier(&self) -> &'static str {
78 "get_enrichment_table_record"
79 }
80
81 fn usage(&self) -> &'static str {
82 const USAGE: &str = const_str::concat!(
83 "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for a row that matches the provided condition. A single row must be matched. If no rows are found or more than one row is found, an error is returned.\n\n",
84 super::ENRICHMENT_TABLE_EXPLAINER
85 );
86 USAGE
87 }
88
89 fn internal_failure_reasons(&self) -> &'static [&'static str] {
90 &[
91 "The row is not found.",
92 "Multiple rows are found that match the condition.",
93 ]
94 }
95
96 fn category(&self) -> &'static str {
97 Category::Enrichment.as_ref()
98 }
99
100 fn return_kind(&self) -> u16 {
101 kind::OBJECT
102 }
103
104 fn parameters(&self) -> &'static [Parameter] {
105 PARAMETERS
106 }
107
108 fn examples(&self) -> &'static [Example] {
109 &[
110 example! {
111 title: "Exact match",
112 source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
113 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
114 },
115 example! {
116 title: "Case insensitive match",
117 source: indoc !{r#"
118 get_enrichment_table_record!(
119 "test",
120 {"surname": "bob", "firstname": "John"},
121 case_sensitive: false
122 )
123 "#},
124 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
125 },
126 example! {
127 title: "Date range search",
128 source: indoc! {r#"
129 get_enrichment_table_record!(
130 "test",
131 {
132 "surname": "Smith",
133 "date_of_birth": {
134 "from": t'1985-01-01T00:00:00Z',
135 "to": t'1985-12-31T00:00:00Z'
136 }
137 }
138 )
139 "#},
140 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
141 },
142 ]
143 }
144
145 fn compile(
146 &self,
147 state: &TypeState,
148 ctx: &mut FunctionCompileContext,
149 arguments: ArgumentList,
150 ) -> Compiled {
151 let registry = ctx
152 .get_external_context_mut::<TableRegistry>()
153 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
154
155 let tables = registry
156 .table_ids()
157 .into_iter()
158 .map(Value::from)
159 .collect::<Vec<_>>();
160
161 let table = arguments
162 .required_enum("table", &tables, state)?
163 .try_bytes_utf8_lossy()
164 .expect("table is not valid utf8")
165 .into_owned();
166 let condition = arguments.required_object("condition")?;
167
168 let select = arguments.optional("select");
169
170 let case_sensitive = is_case_sensitive(&arguments, state)?;
171 let wildcard = arguments.optional("wildcard");
172 let index = Some(
173 add_index(registry, &table, case_sensitive, &condition)
174 .map_err(|err| Box::new(err) as Box<_>)?,
175 );
176
177 Ok(GetEnrichmentTableRecordFn {
178 table,
179 condition,
180 index,
181 select,
182 case_sensitive,
183 wildcard,
184 enrichment_tables: registry.as_readonly(),
185 }
186 .as_expr())
187 }
188}
189
190#[derive(Debug, Clone)]
191pub struct GetEnrichmentTableRecordFn {
192 table: String,
193 condition: BTreeMap<KeyString, expression::Expr>,
194 index: Option<IndexHandle>,
195 select: Option<Box<dyn Expression>>,
196 wildcard: Option<Box<dyn Expression>>,
197 case_sensitive: Case,
198 enrichment_tables: TableSearch,
199}
200
201impl FunctionExpression for GetEnrichmentTableRecordFn {
202 fn resolve(&self, ctx: &mut Context) -> Resolved {
203 let condition = self
204 .condition
205 .iter()
206 .map(|(key, value)| {
207 let value = value.resolve(ctx)?;
208 evaluate_condition(key, value)
209 })
210 .collect::<ExpressionResult<Vec<Condition>>>()?;
211
212 let select = self
213 .select
214 .as_ref()
215 .map(|array| array.resolve(ctx))
216 .transpose()?;
217
218 let table = &self.table;
219 let case_sensitive = self.case_sensitive;
220 let wildcard = self
221 .wildcard
222 .as_ref()
223 .map(|array| array.resolve(ctx))
224 .transpose()?;
225 let index = self.index;
226 let enrichment_tables = &self.enrichment_tables;
227
228 get_enrichment_table_record(
229 select,
230 enrichment_tables,
231 table,
232 case_sensitive,
233 wildcard,
234 &condition,
235 index,
236 )
237 }
238
239 fn type_def(&self, _: &TypeState) -> TypeDef {
240 TypeDef::object(Collection::any()).fallible()
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use vrl::{
247 compiler::{TargetValue, prelude::TimeZone, state::RuntimeState},
248 value,
249 value::Secrets,
250 };
251
252 use super::*;
253 use crate::test_util::get_table_registry;
254
255 #[test]
256 fn find_table_row() {
257 let registry = get_table_registry();
258 let func = GetEnrichmentTableRecordFn {
259 table: "dummy1".to_string(),
260 condition: BTreeMap::from([(
261 "field".into(),
262 expression::Literal::from("value").into(),
263 )]),
264 index: Some(IndexHandle(999)),
265 select: None,
266 case_sensitive: Case::Sensitive,
267 wildcard: None,
268 enrichment_tables: registry.as_readonly(),
269 };
270
271 let tz = TimeZone::default();
272 let object: Value = BTreeMap::new().into();
273 let mut target = TargetValue {
274 value: object,
275 metadata: value!({}),
276 secrets: Secrets::new(),
277 };
278 let mut runtime_state = RuntimeState::default();
279 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
280
281 registry.finish_load();
282
283 let got = func.resolve(&mut ctx);
284
285 assert_eq!(Ok(value! ({ "field": "result" })), got);
286 }
287}