Skip to main content

vector/config/
diff.rs

1use std::collections::HashSet;
2
3use indexmap::IndexMap;
4use vector_lib::config::OutputId;
5
6use super::{ComponentKey, Config, EnrichmentTableOuter};
7
8#[derive(Debug)]
9pub struct ConfigDiff {
10    pub sources: Difference,
11    pub transforms: Difference,
12    pub sinks: Difference,
13    pub enrichment_tables: EnrichmentTableDiff,
14    pub components_to_reload: HashSet<ComponentKey>,
15}
16
17impl ConfigDiff {
18    pub fn initial(initial: &Config) -> Self {
19        Self::new(&Config::default(), initial, HashSet::new())
20    }
21
22    pub fn new(old: &Config, new: &Config, components_to_reload: HashSet<ComponentKey>) -> Self {
23        ConfigDiff {
24            sources: Difference::new(&old.sources, &new.sources, &components_to_reload),
25            transforms: Difference::new(&old.transforms, &new.transforms, &components_to_reload),
26            sinks: Difference::new(&old.sinks, &new.sinks, &components_to_reload),
27            enrichment_tables: EnrichmentTableDiff::new(
28                &old.enrichment_tables,
29                &new.enrichment_tables,
30                &components_to_reload,
31            ),
32            components_to_reload,
33        }
34    }
35
36    /// Swaps removed with added in Differences.
37    pub const fn flip(mut self) -> Self {
38        self.sources.flip();
39        self.transforms.flip();
40        self.sinks.flip();
41        self.enrichment_tables.flip();
42        self
43    }
44
45    /// Checks whether the given component is present at all.
46    pub fn contains(&self, key: &ComponentKey) -> bool {
47        self.sources.contains(key)
48            || self.transforms.contains(key)
49            || self.sinks.contains(key)
50            || self.enrichment_tables.contains(key)
51    }
52
53    /// Checks whether the given component is changed.
54    pub fn is_changed(&self, key: &ComponentKey) -> bool {
55        self.sources.is_changed(key)
56            || self.transforms.is_changed(key)
57            || self.sinks.is_changed(key)
58            || self.enrichment_tables.is_changed(key)
59    }
60
61    /// Checks whether the given component is removed.
62    pub fn is_removed(&self, key: &ComponentKey) -> bool {
63        self.sources.is_removed(key)
64            || self.transforms.is_removed(key)
65            || self.sinks.is_removed(key)
66            || self.enrichment_tables.is_removed(key)
67    }
68}
69
70#[derive(Debug)]
71pub struct EnrichmentTableDiff {
72    /// Difference for the enrichment table configuration keyed by table name.
73    pub tables: Difference,
74    /// Difference for source components derived from enrichment tables.
75    pub sources: Difference,
76    /// Difference for sink components derived from enrichment tables.
77    pub sinks: Difference,
78}
79
80impl EnrichmentTableDiff {
81    fn new(
82        old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
83        new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
84        need_change: &HashSet<ComponentKey>,
85    ) -> Self {
86        let tables = Difference::new(old, new, need_change);
87        let sources = Difference::from_enrichment_table_components(
88            old,
89            new,
90            need_change,
91            enrichment_table_source_key,
92        );
93        let sinks = Difference::from_enrichment_table_components(
94            old,
95            new,
96            need_change,
97            enrichment_table_sink_key,
98        );
99
100        Self {
101            tables,
102            sources,
103            sinks,
104        }
105    }
106
107    /// Checks whether or not any enrichment table-derived component is being changed or added.
108    pub fn any_changed_or_added(&self) -> bool {
109        self.sources.any_changed_or_added() || self.sinks.any_changed_or_added()
110    }
111
112    /// Checks whether or not any enrichment table-derived component is being changed or removed.
113    pub fn any_changed_or_removed(&self) -> bool {
114        self.sources.any_changed_or_removed() || self.sinks.any_changed_or_removed()
115    }
116
117    /// Checks whether the given enrichment table-derived component is present at all.
118    pub fn contains(&self, id: &ComponentKey) -> bool {
119        self.sources.contains(id) || self.sinks.contains(id)
120    }
121
122    /// Checks whether the given enrichment table-derived component is present as a change or addition.
123    pub fn contains_new(&self, id: &ComponentKey) -> bool {
124        self.sources.contains_new(id) || self.sinks.contains_new(id)
125    }
126
127    /// Checks whether or not the given enrichment table-derived component is changed.
128    pub fn is_changed(&self, key: &ComponentKey) -> bool {
129        self.sources.is_changed(key) || self.sinks.is_changed(key)
130    }
131
132    /// Checks whether the given enrichment table-derived component is present as an addition.
133    pub fn is_added(&self, id: &ComponentKey) -> bool {
134        self.sources.is_added(id) || self.sinks.is_added(id)
135    }
136
137    /// Checks whether or not the given enrichment table-derived component is removed.
138    pub fn is_removed(&self, key: &ComponentKey) -> bool {
139        self.sources.is_removed(key) || self.sinks.is_removed(key)
140    }
141
142    const fn flip(&mut self) {
143        self.tables.flip();
144        self.sources.flip();
145        self.sinks.flip();
146    }
147}
148
149#[derive(Debug)]
150pub struct Difference {
151    pub to_remove: HashSet<ComponentKey>,
152    pub to_change: HashSet<ComponentKey>,
153    pub to_add: HashSet<ComponentKey>,
154}
155
156impl Difference {
157    fn new<C>(
158        old: &IndexMap<ComponentKey, C>,
159        new: &IndexMap<ComponentKey, C>,
160        need_change: &HashSet<ComponentKey>,
161    ) -> Self
162    where
163        C: serde::Serialize + serde::Deserialize<'static>,
164    {
165        let old_names = old.keys().cloned().collect::<HashSet<_>>();
166        let new_names = new.keys().cloned().collect::<HashSet<_>>();
167
168        let to_change = old_names
169            .intersection(&new_names)
170            .filter(|&n| {
171                // This is a hack around the issue of comparing two
172                // trait objects. Json is used here over toml since
173                // toml does not support serializing `None`
174                // to_value is used specifically (instead of string)
175                // to avoid problems comparing serialized HashMaps,
176                // which can iterate in varied orders.
177                let old_value = serde_json::to_value(&old[n]).unwrap();
178                let new_value = serde_json::to_value(&new[n]).unwrap();
179                old_value != new_value || need_change.contains(n)
180            })
181            .cloned()
182            .collect::<HashSet<_>>();
183
184        let to_remove = &old_names - &new_names;
185        let to_add = &new_names - &old_names;
186
187        Self {
188            to_remove,
189            to_change,
190            to_add,
191        }
192    }
193
194    fn from_enrichment_table_components<F>(
195        old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
196        new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
197        need_change: &HashSet<ComponentKey>,
198        component_key: F,
199    ) -> Self
200    where
201        F: Fn(&ComponentKey, &EnrichmentTableOuter<OutputId>) -> Option<ComponentKey>,
202    {
203        let old_table_keys = extract_table_component_keys(old, &component_key);
204        let new_table_keys = extract_table_component_keys(new, &component_key);
205
206        let to_change = old_table_keys
207            .intersection(&new_table_keys)
208            .filter(|(table_key, _derived_component_key)| {
209                // This is a hack around the issue of comparing two
210                // trait objects. Json is used here over toml since
211                // toml does not support serializing `None`
212                // to_value is used specifically (instead of string)
213                // to avoid problems comparing serialized HashMaps,
214                // which can iterate in varied orders.
215                let old_value = serde_json::to_value(&old[*table_key]).unwrap();
216                let new_value = serde_json::to_value(&new[*table_key]).unwrap();
217                old_value != new_value || need_change.contains(*table_key)
218            })
219            .cloned()
220            .map(|(_table_key, derived_component_key)| derived_component_key)
221            .collect::<HashSet<_>>();
222
223        // Extract only the derived component keys for the final difference calculation
224        let old_component_keys = old_table_keys
225            .into_iter()
226            .map(|(_table_key, component_key)| component_key)
227            .collect::<HashSet<_>>();
228        let new_component_keys = new_table_keys
229            .into_iter()
230            .map(|(_table_key, component_key)| component_key)
231            .collect::<HashSet<_>>();
232
233        let to_remove = &old_component_keys - &new_component_keys;
234        let to_add = &new_component_keys - &old_component_keys;
235
236        Self {
237            to_remove,
238            to_change,
239            to_add,
240        }
241    }
242
243    /// Checks whether or not any components are being changed or added.
244    pub fn any_changed_or_added(&self) -> bool {
245        !(self.to_change.is_empty() && self.to_add.is_empty())
246    }
247
248    /// Checks whether or not any components are being changed or removed.
249    pub fn any_changed_or_removed(&self) -> bool {
250        !(self.to_change.is_empty() && self.to_remove.is_empty())
251    }
252
253    /// Checks whether the given component is present at all.
254    pub fn contains(&self, id: &ComponentKey) -> bool {
255        self.to_add.contains(id) || self.to_change.contains(id) || self.to_remove.contains(id)
256    }
257
258    /// Checks whether the given component is present as a change or addition.
259    pub fn contains_new(&self, id: &ComponentKey) -> bool {
260        self.to_add.contains(id) || self.to_change.contains(id)
261    }
262
263    /// Checks whether or not the given component is changed.
264    pub fn is_changed(&self, key: &ComponentKey) -> bool {
265        self.to_change.contains(key)
266    }
267
268    /// Checks whether the given component is present as an addition.
269    pub fn is_added(&self, id: &ComponentKey) -> bool {
270        self.to_add.contains(id)
271    }
272
273    /// Checks whether or not the given component is removed.
274    pub fn is_removed(&self, key: &ComponentKey) -> bool {
275        self.to_remove.contains(key)
276    }
277
278    const fn flip(&mut self) {
279        std::mem::swap(&mut self.to_remove, &mut self.to_add);
280    }
281
282    pub fn changed_and_added(&self) -> impl Iterator<Item = &ComponentKey> {
283        self.to_change.iter().chain(self.to_add.iter())
284    }
285
286    pub fn removed_and_changed(&self) -> impl Iterator<Item = &ComponentKey> {
287        self.to_change.iter().chain(self.to_remove.iter())
288    }
289}
290
291/// Helper function to extract component keys from enrichment tables.
292fn extract_table_component_keys<'a, F>(
293    tables: &'a IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
294    component_key: &F,
295) -> HashSet<(&'a ComponentKey, ComponentKey)>
296where
297    F: Fn(&ComponentKey, &EnrichmentTableOuter<OutputId>) -> Option<ComponentKey>,
298{
299    tables
300        .iter()
301        .filter_map(|(table_key, table)| {
302            component_key(table_key, table).map(|component_key| (table_key, component_key))
303        })
304        .collect()
305}
306
307fn enrichment_table_source_key(
308    table_key: &ComponentKey,
309    table: &EnrichmentTableOuter<OutputId>,
310) -> Option<ComponentKey> {
311    table
312        .as_source(table_key)
313        .map(|(component_key, _)| component_key)
314}
315
316fn enrichment_table_sink_key(
317    table_key: &ComponentKey,
318    table: &EnrichmentTableOuter<OutputId>,
319) -> Option<ComponentKey> {
320    table
321        .as_sink(table_key)
322        .map(|(component_key, _)| component_key)
323}
324
325#[cfg(all(test, feature = "enrichment-tables-memory"))]
326mod tests {
327    use crate::config::ConfigBuilder;
328    use indoc::indoc;
329
330    use super::*;
331
332    #[test]
333    fn diff_enrichment_tables_uses_correct_keys() {
334        let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
335            enrichment_tables:
336              memory_table:
337                type: "memory"
338                ttl: 10
339                inputs: []
340                source_config:
341                  source_key: "memory_table_source"
342                  export_expired_items: true
343                  export_interval: 50
344
345              memory_table_unchanged:
346                type: "memory"
347                ttl: 10
348                inputs: []
349
350              memory_table_old:
351                type: "memory"
352                ttl: 10
353                inputs: []
354
355            sources:
356              test:
357                type: "test_basic"
358
359            sinks:
360              test_sink:
361                type: "test_basic"
362                inputs: ["test"]
363        "#})
364        .unwrap()
365        .build()
366        .unwrap();
367
368        let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
369            enrichment_tables:
370              memory_table:
371                type: "memory"
372                ttl: 20
373                inputs: []
374                source_config:
375                  source_key: "memory_table_source"
376                  export_expired_items: true
377                  export_interval: 50
378
379              memory_table_unchanged:
380                type: "memory"
381                ttl: 10
382                inputs: []
383
384              memory_table_new:
385                type: "memory"
386                ttl: 1000
387                inputs: []
388
389            sources:
390              test:
391                type: "test_basic"
392
393            sinks:
394              test_sink:
395                type: "test_basic"
396                inputs: ["test"]
397        "#})
398        .unwrap()
399        .build()
400        .unwrap();
401
402        let diff = EnrichmentTableDiff::new(
403            &old_config.enrichment_tables,
404            &new_config.enrichment_tables,
405            &Default::default(),
406        );
407
408        assert_eq!(
409            diff.tables.to_add,
410            HashSet::from_iter(["memory_table_new".into()])
411        );
412        assert_eq!(
413            diff.tables.to_remove,
414            HashSet::from_iter(["memory_table_old".into()])
415        );
416        assert_eq!(
417            diff.tables.to_change,
418            HashSet::from_iter(["memory_table".into()])
419        );
420
421        assert_eq!(
422            diff.sources.to_change,
423            HashSet::from_iter(["memory_table_source".into()])
424        );
425        assert!(diff.sources.to_add.is_empty());
426        assert!(diff.sources.to_remove.is_empty());
427
428        assert_eq!(
429            diff.sinks.to_add,
430            HashSet::from_iter(["memory_table_new".into()])
431        );
432        assert_eq!(
433            diff.sinks.to_remove,
434            HashSet::from_iter(["memory_table_old".into()])
435        );
436        assert_eq!(
437            diff.sinks.to_change,
438            HashSet::from_iter(["memory_table".into()])
439        );
440    }
441
442    #[test]
443    fn diff_enrichment_table_component_helpers_ignore_table_config_keys() {
444        let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
445            sources:
446              test:
447                type: "test_basic"
448
449            sinks:
450              test_sink:
451                type: "test_basic"
452                inputs: ["test"]
453        "#})
454        .unwrap()
455        .build()
456        .unwrap();
457
458        let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
459            enrichment_tables:
460              file_table:
461                type: "file"
462                file:
463                  path: ./tests/data/enrichment.csv
464                  encoding:
465                    type: "csv"
466                schema:
467                  id: integer
468
469            sources:
470              test:
471                type: "test_basic"
472
473            sinks:
474              test_sink:
475                type: "test_basic"
476                inputs: ["test"]
477        "#})
478        .unwrap()
479        .build()
480        .unwrap();
481
482        let diff = EnrichmentTableDiff::new(
483            &old_config.enrichment_tables,
484            &new_config.enrichment_tables,
485            &Default::default(),
486        );
487        let table_key = ComponentKey::from("file_table");
488
489        assert_eq!(diff.tables.to_add, HashSet::from_iter([table_key.clone()]));
490        assert!(diff.sources.to_add.is_empty());
491        assert!(diff.sinks.to_add.is_empty());
492
493        assert!(!diff.any_changed_or_added());
494        assert!(!diff.contains(&table_key));
495        assert!(!diff.contains_new(&table_key));
496        assert!(!diff.is_added(&table_key));
497    }
498
499    #[test]
500    fn diff_enrichment_tables_tracks_source_key_renames() {
501        let old_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
502            enrichment_tables:
503              memory_table:
504                type: "memory"
505                ttl: 10
506                inputs: []
507                source_config:
508                  source_key: "memory_table_source_old"
509                  export_interval: 50
510
511            sources:
512              test:
513                type: "test_basic"
514
515            sinks:
516              test_sink:
517                type: "test_basic"
518                inputs: ["test"]
519        "#})
520        .unwrap()
521        .build()
522        .unwrap();
523
524        let new_config: Config = serde_yaml::from_str::<ConfigBuilder>(indoc! {r#"
525            enrichment_tables:
526              memory_table:
527                type: "memory"
528                ttl: 10
529                inputs: []
530                source_config:
531                  source_key: "memory_table_source_new"
532                  export_interval: 50
533
534            sources:
535              test:
536                type: "test_basic"
537
538            sinks:
539              test_sink:
540                type: "test_basic"
541                inputs: ["test"]
542        "#})
543        .unwrap()
544        .build()
545        .unwrap();
546
547        let diff = EnrichmentTableDiff::new(
548            &old_config.enrichment_tables,
549            &new_config.enrichment_tables,
550            &Default::default(),
551        );
552
553        assert_eq!(
554            diff.tables.to_change,
555            HashSet::from_iter(["memory_table".into()])
556        );
557        assert!(diff.tables.to_add.is_empty());
558        assert!(diff.tables.to_remove.is_empty());
559
560        assert_eq!(
561            diff.sources.to_add,
562            HashSet::from_iter(["memory_table_source_new".into()])
563        );
564        assert_eq!(
565            diff.sources.to_remove,
566            HashSet::from_iter(["memory_table_source_old".into()])
567        );
568        assert!(diff.sources.to_change.is_empty());
569
570        assert_eq!(
571            diff.sinks.to_change,
572            HashSet::from_iter(["memory_table".into()])
573        );
574        assert!(diff.sinks.to_add.is_empty());
575        assert!(diff.sinks.to_remove.is_empty());
576    }
577}