Skip to main content

vector/enrichment_tables/memory/
config.rs

1use std::{num::NonZeroU64, sync::Arc};
2
3use async_trait::async_trait;
4use futures::{FutureExt, future};
5use tokio::sync::Mutex;
6use vector_lib::{
7    config::{AcknowledgementsConfig, DataType, Input, LogNamespace},
8    configurable::configurable_component,
9    enrichment::Table,
10    id::ComponentKey,
11    lookup::lookup_v2::OptionalValuePath,
12    schema::{self},
13    sink::VectorSink,
14};
15use vrl::{path::OwnedTargetPath, value::Kind};
16
17use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE};
18use crate::{
19    config::{
20        EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21    },
22    sinks::Healthcheck,
23    sources::Source,
24};
25
26/// Configuration for the `memory` enrichment table.
27#[configurable_component(enrichment_table("memory"))]
28#[derive(Clone)]
29pub struct MemoryConfig {
30    /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache.
31    /// When TTL expires, data behind a specific key in the cache is removed.
32    /// TTL is reset when the key is replaced.
33    #[serde(default = "default_ttl")]
34    pub ttl: u64,
35    /// The scan interval used to look for expired records. This is provided
36    /// as an optimization to ensure that TTL is updated, but without doing
37    /// too many cache scans.
38    #[serde(default = "default_scan_interval")]
39    pub scan_interval: NonZeroU64,
40    /// The interval used for making writes visible in the table.
41    /// Longer intervals might get better performance,
42    /// but there is a longer delay before the data is visible in the table.
43    /// Since every TTL scan makes its changes visible, only use this value
44    /// if it is shorter than the `scan_interval`.
45    ///
46    /// By default, all writes are made visible immediately.
47    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
48    pub flush_interval: Option<u64>,
49    /// Maximum size of the table in bytes. All insertions that make
50    /// this table bigger than the maximum size are rejected.
51    ///
52    /// By default, there is no size limit.
53    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
54    pub max_byte_size: Option<u64>,
55    /// The namespace to use for logs. This overrides the global setting.
56    #[configurable(metadata(docs::hidden))]
57    #[serde(default)]
58    pub log_namespace: Option<bool>,
59    /// Configuration of internal metrics
60    #[configurable(derived)]
61    #[serde(default)]
62    pub internal_metrics: InternalMetricsConfig,
63    /// Configuration for source functionality.
64    #[configurable(derived)]
65    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
66    pub source_config: Option<MemorySourceConfig>,
67    /// Field in the incoming value used as the TTL override.
68    #[configurable(derived)]
69    #[serde(default)]
70    pub ttl_field: OptionalValuePath,
71    /// Behavior for memory table state on configuration reload.
72    #[configurable(derived)]
73    #[serde(default)]
74    pub reload_behavior: ReloadBehavior,
75
76    #[serde(skip)]
77    memory: Arc<Mutex<Option<Box<Memory>>>>,
78}
79
80/// Behavior for memory enrichment table state on configuration reload.
81#[configurable_component]
82#[derive(Clone, Default)]
83#[serde(rename_all = "kebab-case")]
84pub enum ReloadBehavior {
85    /// Always clear state on configuration reload.
86    #[default]
87    ClearState,
88    /// Try to preserve state when possible.
89    PreserveState,
90}
91
92/// Configuration for memory enrichment table source functionality.
93#[configurable_component]
94#[derive(Clone, Debug, PartialEq, Eq)]
95#[serde(deny_unknown_fields)]
96pub struct MemorySourceConfig {
97    /// Interval for exporting all data from the table when used as a source.
98    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
99    pub export_interval: Option<NonZeroU64>,
100    /// Batch size for data exporting. Used to prevent exporting entire table at
101    /// once and blocking the system.
102    ///
103    /// By default, batches are not used and entire table is exported.
104    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
105    pub export_batch_size: Option<u64>,
106    /// If set to true, all data will be removed from cache after exporting.
107    /// Only valid if used as a source and export_interval > 0
108    ///
109    /// By default, export will not remove data from cache
110    #[serde(default = "crate::serde::default_false")]
111    pub remove_after_export: bool,
112    /// Set to true to export expired items via the `expired` output port.
113    /// Expired items ignore other settings and are exported as they are flushed from the table.
114    #[serde(default = "crate::serde::default_false")]
115    pub export_expired_items: bool,
116    /// Key to use for this component when used as a source. This must be different from the
117    /// component key.
118    pub source_key: String,
119}
120
121impl PartialEq for MemoryConfig {
122    fn eq(&self, other: &Self) -> bool {
123        self.ttl == other.ttl
124            && self.scan_interval == other.scan_interval
125            && self.flush_interval == other.flush_interval
126    }
127}
128impl Eq for MemoryConfig {}
129
130impl Default for MemoryConfig {
131    fn default() -> Self {
132        Self {
133            ttl: default_ttl(),
134            scan_interval: default_scan_interval(),
135            flush_interval: None,
136            memory: Arc::new(Mutex::new(None)),
137            max_byte_size: None,
138            log_namespace: None,
139            source_config: None,
140            internal_metrics: InternalMetricsConfig::default(),
141            ttl_field: OptionalValuePath::none(),
142            reload_behavior: Default::default(),
143        }
144    }
145}
146
147const fn default_ttl() -> u64 {
148    600
149}
150
151const fn default_scan_interval() -> NonZeroU64 {
152    unsafe { NonZeroU64::new_unchecked(30) }
153}
154
155impl MemoryConfig {
156    pub(super) async fn get_or_build_memory(
157        &self,
158        prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
159    ) -> Memory {
160        let mut boxed_memory = self.memory.lock().await;
161        *boxed_memory
162            .get_or_insert_with(|| {
163                if let Some(prev) = prev_state {
164                    Box::new(Memory::from_previous_state(self.clone(), prev))
165                } else {
166                    Box::new(Memory::new(self.clone()))
167                }
168            })
169            .clone()
170    }
171}
172
173impl EnrichmentTableConfig for MemoryConfig {
174    async fn build(
175        &self,
176        _globals: &crate::config::GlobalOptions,
177        prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
178    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
179        Ok(Box::new(self.get_or_build_memory(prev_state).await))
180    }
181
182    fn wants_previous_state(&self) -> bool {
183        matches!(self.reload_behavior, ReloadBehavior::PreserveState)
184    }
185
186    fn sink_config(
187        &self,
188        default_key: &ComponentKey,
189    ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
190        Some((default_key.clone(), Box::new(self.clone())))
191    }
192
193    fn source_config(
194        &self,
195        _default_key: &ComponentKey,
196    ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
197        let Some(source_config) = &self.source_config else {
198            return None;
199        };
200        Some((
201            source_config.source_key.clone().into(),
202            Box::new(self.clone()),
203        ))
204    }
205}
206
207#[async_trait]
208#[typetag::serde(name = "memory_enrichment_table")]
209impl SinkConfig for MemoryConfig {
210    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
211        let sink = VectorSink::from_event_streamsink(self.get_or_build_memory(None).await);
212
213        Ok((sink, future::ok(()).boxed()))
214    }
215
216    fn input(&self) -> Input {
217        Input::log()
218    }
219
220    fn acknowledgements(&self) -> &AcknowledgementsConfig {
221        &AcknowledgementsConfig::DEFAULT
222    }
223}
224
225#[async_trait]
226#[typetag::serde(name = "memory_enrichment_table")]
227impl SourceConfig for MemoryConfig {
228    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
229        let memory = self.get_or_build_memory(None).await;
230
231        let log_namespace = cx.log_namespace(self.log_namespace);
232
233        Ok(Box::pin(
234            memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
235        ))
236    }
237
238    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
239        let log_namespace = global_log_namespace.merge(self.log_namespace);
240        let schema_definition = match log_namespace {
241            LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
242            LogNamespace::Vector => {
243                schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
244                    .with_meaning(OwnedTargetPath::event_root(), "message")
245            }
246        }
247        .with_standard_vector_source_metadata();
248
249        if self
250            .source_config
251            .as_ref()
252            .map(|c| c.export_expired_items)
253            .unwrap_or_default()
254        {
255            vec![
256                SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()),
257                SourceOutput::new_maybe_logs(DataType::Log, schema_definition)
258                    .with_port(EXPIRED_ROUTE),
259            ]
260        } else {
261            vec![SourceOutput::new_maybe_logs(
262                DataType::Log,
263                schema_definition,
264            )]
265        }
266    }
267
268    fn can_acknowledge(&self) -> bool {
269        false
270    }
271}
272
273impl std::fmt::Debug for MemoryConfig {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        f.debug_struct("MemoryConfig")
276            .field("ttl", &self.ttl)
277            .field("scan_interval", &self.scan_interval)
278            .field("flush_interval", &self.flush_interval)
279            .field("max_byte_size", &self.max_byte_size)
280            .finish()
281    }
282}
283
284impl_generate_config_from_default!(MemoryConfig);