1use std::collections::HashSet;
2
3use indexmap::IndexMap;
4use vector_lib::config::OutputId;
5
6use super::{ComponentKey, Config, EnrichmentTableOuter};
7
8#[derive(Debug)]
9pub struct ConfigDiff {
10 pub sources: Difference,
11 pub transforms: Difference,
12 pub sinks: Difference,
13 pub enrichment_tables: EnrichmentTableDiff,
14 pub components_to_reload: HashSet<ComponentKey>,
15}
16
17impl ConfigDiff {
18 pub fn initial(initial: &Config) -> Self {
19 Self::new(&Config::default(), initial, HashSet::new())
20 }
21
22 pub fn new(old: &Config, new: &Config, components_to_reload: HashSet<ComponentKey>) -> Self {
23 ConfigDiff {
24 sources: Difference::new(&old.sources, &new.sources, &components_to_reload),
25 transforms: Difference::new(&old.transforms, &new.transforms, &components_to_reload),
26 sinks: Difference::new(&old.sinks, &new.sinks, &components_to_reload),
27 enrichment_tables: EnrichmentTableDiff::new(
28 &old.enrichment_tables,
29 &new.enrichment_tables,
30 &components_to_reload,
31 ),
32 components_to_reload,
33 }
34 }
35
36 pub const fn flip(mut self) -> Self {
38 self.sources.flip();
39 self.transforms.flip();
40 self.sinks.flip();
41 self.enrichment_tables.flip();
42 self
43 }
44
45 pub fn contains(&self, key: &ComponentKey) -> bool {
47 self.sources.contains(key)
48 || self.transforms.contains(key)
49 || self.sinks.contains(key)
50 || self.enrichment_tables.contains(key)
51 }
52
53 pub fn is_changed(&self, key: &ComponentKey) -> bool {
55 self.sources.is_changed(key)
56 || self.transforms.is_changed(key)
57 || self.sinks.is_changed(key)
58 || self.enrichment_tables.is_changed(key)
59 }
60
61 pub fn is_removed(&self, key: &ComponentKey) -> bool {
63 self.sources.is_removed(key)
64 || self.transforms.is_removed(key)
65 || self.sinks.is_removed(key)
66 || self.enrichment_tables.is_removed(key)
67 }
68}
69
70#[derive(Debug)]
71pub struct EnrichmentTableDiff {
72 pub tables: Difference,
74 pub sources: Difference,
76 pub sinks: Difference,
78}
79
80impl EnrichmentTableDiff {
81 fn new(
82 old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
83 new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
84 need_change: &HashSet<ComponentKey>,
85 ) -> Self {
86 let tables = Difference::new(old, new, need_change);
87 let sources = Difference::from_enrichment_table_components(
88 old,
89 new,
90 need_change,
91 enrichment_table_source_key,
92 );
93 let sinks = Difference::from_enrichment_table_components(
94 old,
95 new,
96 need_change,
97 enrichment_table_sink_key,
98 );
99
100 Self {
101 tables,
102 sources,
103 sinks,
104 }
105 }
106
107 pub fn any_changed_or_added(&self) -> bool {
109 self.sources.any_changed_or_added() || self.sinks.any_changed_or_added()
110 }
111
112 pub fn any_changed_or_removed(&self) -> bool {
114 self.sources.any_changed_or_removed() || self.sinks.any_changed_or_removed()
115 }
116
117 pub fn contains(&self, id: &ComponentKey) -> bool {
119 self.sources.contains(id) || self.sinks.contains(id)
120 }
121
122 pub fn contains_new(&self, id: &ComponentKey) -> bool {
124 self.sources.contains_new(id) || self.sinks.contains_new(id)
125 }
126
127 pub fn is_changed(&self, key: &ComponentKey) -> bool {
129 self.sources.is_changed(key) || self.sinks.is_changed(key)
130 }
131
132 pub fn is_added(&self, id: &ComponentKey) -> bool {
134 self.sources.is_added(id) || self.sinks.is_added(id)
135 }
136
137 pub fn is_removed(&self, key: &ComponentKey) -> bool {
139 self.sources.is_removed(key) || self.sinks.is_removed(key)
140 }
141
142 const fn flip(&mut self) {
143 self.tables.flip();
144 self.sources.flip();
145 self.sinks.flip();
146 }
147}
148
149#[derive(Debug)]
150pub struct Difference {
151 pub to_remove: HashSet<ComponentKey>,
152 pub to_change: HashSet<ComponentKey>,
153 pub to_add: HashSet<ComponentKey>,
154}
155
156impl Difference {
157 fn new<C>(
158 old: &IndexMap<ComponentKey, C>,
159 new: &IndexMap<ComponentKey, C>,
160 need_change: &HashSet<ComponentKey>,
161 ) -> Self
162 where
163 C: serde::Serialize + serde::Deserialize<'static>,
164 {
165 let old_names = old.keys().cloned().collect::<HashSet<_>>();
166 let new_names = new.keys().cloned().collect::<HashSet<_>>();
167
168 let to_change = old_names
169 .intersection(&new_names)
170 .filter(|&n| {
171 let old_value = serde_json::to_value(&old[n]).unwrap();
178 let new_value = serde_json::to_value(&new[n]).unwrap();
179 old_value != new_value || need_change.contains(n)
180 })
181 .cloned()
182 .collect::<HashSet<_>>();
183
184 let to_remove = &old_names - &new_names;
185 let to_add = &new_names - &old_names;
186
187 Self {
188 to_remove,
189 to_change,
190 to_add,
191 }
192 }
193
194 fn from_enrichment_table_components<F>(
195 old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
196 new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
197 need_change: &HashSet<ComponentKey>,
198 component_key: F,
199 ) -> Self
200 where
201 F: Fn(&ComponentKey, &EnrichmentTableOuter<OutputId>) -> Option<ComponentKey>,
202 {
203 let old_table_keys = extract_table_component_keys(old, &component_key);
204 let new_table_keys = extract_table_component_keys(new, &component_key);
205
206 let to_change = old_table_keys
207 .intersection(&new_table_keys)
208 .filter(|(table_key, _derived_component_key)| {
209 let old_value = serde_json::to_value(&old[*table_key]).unwrap();
216 let new_value = serde_json::to_value(&new[*table_key]).unwrap();
217 old_value != new_value || need_change.contains(*table_key)
218 })
219 .cloned()
220 .map(|(_table_key, derived_component_key)| derived_component_key)
221 .collect::<HashSet<_>>();
222
223 let old_component_keys = old_table_keys
225 .into_iter()
226 .map(|(_table_key, component_key)| component_key)
227 .collect::<HashSet<_>>();
228 let new_component_keys = new_table_keys
229 .into_iter()
230 .map(|(_table_key, component_key)| component_key)
231 .collect::<HashSet<_>>();
232
233 let to_remove = &old_component_keys - &new_component_keys;
234 let to_add = &new_component_keys - &old_component_keys;
235
236 Self {
237 to_remove,
238 to_change,
239 to_add,
240 }
241 }
242
243 pub fn any_changed_or_added(&self) -> bool {
245 !(self.to_change.is_empty() && self.to_add.is_empty())
246 }
247
248 pub fn any_changed_or_removed(&self) -> bool {
250 !(self.to_change.is_empty() && self.to_remove.is_empty())
251 }
252
253 pub fn contains(&self, id: &ComponentKey) -> bool {
255 self.to_add.contains(id) || self.to_change.contains(id) || self.to_remove.contains(id)
256 }
257
258 pub fn contains_new(&self, id: &ComponentKey) -> bool {
260 self.to_add.contains(id) || self.to_change.contains(id)
261 }
262
263 pub fn is_changed(&self, key: &ComponentKey) -> bool {
265 self.to_change.contains(key)
266 }
267
268 pub fn is_added(&self, id: &ComponentKey) -> bool {
270 self.to_add.contains(id)
271 }
272
273 pub fn is_removed(&self, key: &ComponentKey) -> bool {
275 self.to_remove.contains(key)
276 }
277
278 const fn flip(&mut self) {
279 std::mem::swap(&mut self.to_remove, &mut self.to_add);
280 }
281
282 pub fn changed_and_added(&self) -> impl Iterator<Item = &ComponentKey> {
283 self.to_change.iter().chain(self.to_add.iter())
284 }
285
286 pub fn removed_and_changed(&self) -> impl Iterator<Item = &ComponentKey> {
287 self.to_change.iter().chain(self.to_remove.iter())
288 }
289}
290
291fn extract_table_component_keys<'a, F>(
293 tables: &'a IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
294 component_key: &F,
295) -> HashSet<(&'a ComponentKey, ComponentKey)>
296where
297 F: Fn(&ComponentKey, &EnrichmentTableOuter<OutputId>) -> Option<ComponentKey>,
298{
299 tables
300 .iter()
301 .filter_map(|(table_key, table)| {
302 component_key(table_key, table).map(|component_key| (table_key, component_key))
303 })
304 .collect()
305}
306
307fn enrichment_table_source_key(
308 table_key: &ComponentKey,
309 table: &EnrichmentTableOuter<OutputId>,
310) -> Option<ComponentKey> {
311 table
312 .as_source(table_key)
313 .map(|(component_key, _)| component_key)
314}
315
316fn enrichment_table_sink_key(
317 table_key: &ComponentKey,
318 table: &EnrichmentTableOuter<OutputId>,
319) -> Option<ComponentKey> {
320 table
321 .as_sink(table_key)
322 .map(|(component_key, _)| component_key)
323}
324
325#[cfg(all(test, feature = "enrichment-tables-memory"))]
326mod tests {
327 use crate::config::ConfigBuilder;
328 use indoc::indoc;
329
330 use super::*;
331
332 #[test]
333 fn diff_enrichment_tables_uses_correct_keys() {
334 let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
335 enrichment_tables:
336 memory_table:
337 type: "memory"
338 ttl: 10
339 inputs: []
340 source_config:
341 source_key: "memory_table_source"
342 export_expired_items: true
343 export_interval: 50
344
345 memory_table_unchanged:
346 type: "memory"
347 ttl: 10
348 inputs: []
349
350 memory_table_old:
351 type: "memory"
352 ttl: 10
353 inputs: []
354
355 sources:
356 test:
357 type: "test_basic"
358
359 sinks:
360 test_sink:
361 type: "test_basic"
362 inputs: ["test"]
363 "#})
364 .unwrap()
365 .build()
366 .unwrap();
367
368 let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
369 enrichment_tables:
370 memory_table:
371 type: "memory"
372 ttl: 20
373 inputs: []
374 source_config:
375 source_key: "memory_table_source"
376 export_expired_items: true
377 export_interval: 50
378
379 memory_table_unchanged:
380 type: "memory"
381 ttl: 10
382 inputs: []
383
384 memory_table_new:
385 type: "memory"
386 ttl: 1000
387 inputs: []
388
389 sources:
390 test:
391 type: "test_basic"
392
393 sinks:
394 test_sink:
395 type: "test_basic"
396 inputs: ["test"]
397 "#})
398 .unwrap()
399 .build()
400 .unwrap();
401
402 let diff = EnrichmentTableDiff::new(
403 &old_config.enrichment_tables,
404 &new_config.enrichment_tables,
405 &Default::default(),
406 );
407
408 assert_eq!(
409 diff.tables.to_add,
410 HashSet::from_iter(["memory_table_new".into()])
411 );
412 assert_eq!(
413 diff.tables.to_remove,
414 HashSet::from_iter(["memory_table_old".into()])
415 );
416 assert_eq!(
417 diff.tables.to_change,
418 HashSet::from_iter(["memory_table".into()])
419 );
420
421 assert_eq!(
422 diff.sources.to_change,
423 HashSet::from_iter(["memory_table_source".into()])
424 );
425 assert!(diff.sources.to_add.is_empty());
426 assert!(diff.sources.to_remove.is_empty());
427
428 assert_eq!(
429 diff.sinks.to_add,
430 HashSet::from_iter(["memory_table_new".into()])
431 );
432 assert_eq!(
433 diff.sinks.to_remove,
434 HashSet::from_iter(["memory_table_old".into()])
435 );
436 assert_eq!(
437 diff.sinks.to_change,
438 HashSet::from_iter(["memory_table".into()])
439 );
440 }
441
442 #[test]
443 fn diff_enrichment_table_component_helpers_ignore_table_config_keys() {
444 let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
445 sources:
446 test:
447 type: "test_basic"
448
449 sinks:
450 test_sink:
451 type: "test_basic"
452 inputs: ["test"]
453 "#})
454 .unwrap()
455 .build()
456 .unwrap();
457
458 let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
459 enrichment_tables:
460 file_table:
461 type: "file"
462 file:
463 path: ./tests/data/enrichment.csv
464 encoding:
465 type: "csv"
466 schema:
467 id: integer
468
469 sources:
470 test:
471 type: "test_basic"
472
473 sinks:
474 test_sink:
475 type: "test_basic"
476 inputs: ["test"]
477 "#})
478 .unwrap()
479 .build()
480 .unwrap();
481
482 let diff = EnrichmentTableDiff::new(
483 &old_config.enrichment_tables,
484 &new_config.enrichment_tables,
485 &Default::default(),
486 );
487 let table_key = ComponentKey::from("file_table");
488
489 assert_eq!(diff.tables.to_add, HashSet::from_iter([table_key.clone()]));
490 assert!(diff.sources.to_add.is_empty());
491 assert!(diff.sinks.to_add.is_empty());
492
493 assert!(!diff.any_changed_or_added());
494 assert!(!diff.contains(&table_key));
495 assert!(!diff.contains_new(&table_key));
496 assert!(!diff.is_added(&table_key));
497 }
498
499 #[test]
500 fn diff_enrichment_tables_tracks_source_key_renames() {
501 let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
502 enrichment_tables:
503 memory_table:
504 type: "memory"
505 ttl: 10
506 inputs: []
507 source_config:
508 source_key: "memory_table_source_old"
509 export_interval: 50
510
511 sources:
512 test:
513 type: "test_basic"
514
515 sinks:
516 test_sink:
517 type: "test_basic"
518 inputs: ["test"]
519 "#})
520 .unwrap()
521 .build()
522 .unwrap();
523
524 let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
525 enrichment_tables:
526 memory_table:
527 type: "memory"
528 ttl: 10
529 inputs: []
530 source_config:
531 source_key: "memory_table_source_new"
532 export_interval: 50
533
534 sources:
535 test:
536 type: "test_basic"
537
538 sinks:
539 test_sink:
540 type: "test_basic"
541 inputs: ["test"]
542 "#})
543 .unwrap()
544 .build()
545 .unwrap();
546
547 let diff = EnrichmentTableDiff::new(
548 &old_config.enrichment_tables,
549 &new_config.enrichment_tables,
550 &Default::default(),
551 );
552
553 assert_eq!(
554 diff.tables.to_change,
555 HashSet::from_iter(["memory_table".into()])
556 );
557 assert!(diff.tables.to_add.is_empty());
558 assert!(diff.tables.to_remove.is_empty());
559
560 assert_eq!(
561 diff.sources.to_add,
562 HashSet::from_iter(["memory_table_source_new".into()])
563 );
564 assert_eq!(
565 diff.sources.to_remove,
566 HashSet::from_iter(["memory_table_source_old".into()])
567 );
568 assert!(diff.sources.to_change.is_empty());
569
570 assert_eq!(
571 diff.sinks.to_change,
572 HashSet::from_iter(["memory_table".into()])
573 );
574 assert!(diff.sinks.to_add.is_empty());
575 assert!(diff.sinks.to_remove.is_empty());
576 }
577}