vector/enrichment_tables/memory/
config.rs1use std::{num::NonZeroU64, sync::Arc};
2
3use async_trait::async_trait;
4use futures::{FutureExt, future};
5use tokio::sync::Mutex;
6use vector_lib::{
7 config::{AcknowledgementsConfig, DataType, Input, LogNamespace},
8 configurable::configurable_component,
9 enrichment::Table,
10 id::ComponentKey,
11 lookup::lookup_v2::OptionalValuePath,
12 schema::{self},
13 sink::VectorSink,
14};
15use vrl::{path::OwnedTargetPath, value::Kind};
16
17use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE};
18use crate::{
19 config::{
20 EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21 },
22 sinks::Healthcheck,
23 sources::Source,
24};
25
26#[configurable_component(enrichment_table("memory"))]
28#[derive(Clone)]
29pub struct MemoryConfig {
30 #[serde(default = "default_ttl")]
34 pub ttl: u64,
35 #[serde(default = "default_scan_interval")]
39 pub scan_interval: NonZeroU64,
40 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
48 pub flush_interval: Option<u64>,
49 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
54 pub max_byte_size: Option<u64>,
55 #[configurable(metadata(docs::hidden))]
57 #[serde(default)]
58 pub log_namespace: Option<bool>,
59 #[configurable(derived)]
61 #[serde(default)]
62 pub internal_metrics: InternalMetricsConfig,
63 #[configurable(derived)]
65 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
66 pub source_config: Option<MemorySourceConfig>,
67 #[configurable(derived)]
69 #[serde(default)]
70 pub ttl_field: OptionalValuePath,
71 #[configurable(derived)]
73 #[serde(default)]
74 pub reload_behavior: ReloadBehavior,
75
76 #[serde(skip)]
77 memory: Arc<Mutex<Option<Box<Memory>>>>,
78}
79
80#[configurable_component]
82#[derive(Clone, Default)]
83#[serde(rename_all = "kebab-case")]
84pub enum ReloadBehavior {
85 #[default]
87 ClearState,
88 PreserveState,
90}
91
92#[configurable_component]
94#[derive(Clone, Debug, PartialEq, Eq)]
95#[serde(deny_unknown_fields)]
96pub struct MemorySourceConfig {
97 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
99 pub export_interval: Option<NonZeroU64>,
100 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
105 pub export_batch_size: Option<u64>,
106 #[serde(default = "crate::serde::default_false")]
111 pub remove_after_export: bool,
112 #[serde(default = "crate::serde::default_false")]
115 pub export_expired_items: bool,
116 pub source_key: String,
119}
120
121impl PartialEq for MemoryConfig {
122 fn eq(&self, other: &Self) -> bool {
123 self.ttl == other.ttl
124 && self.scan_interval == other.scan_interval
125 && self.flush_interval == other.flush_interval
126 }
127}
128impl Eq for MemoryConfig {}
129
130impl Default for MemoryConfig {
131 fn default() -> Self {
132 Self {
133 ttl: default_ttl(),
134 scan_interval: default_scan_interval(),
135 flush_interval: None,
136 memory: Arc::new(Mutex::new(None)),
137 max_byte_size: None,
138 log_namespace: None,
139 source_config: None,
140 internal_metrics: InternalMetricsConfig::default(),
141 ttl_field: OptionalValuePath::none(),
142 reload_behavior: Default::default(),
143 }
144 }
145}
146
147const fn default_ttl() -> u64 {
148 600
149}
150
151const fn default_scan_interval() -> NonZeroU64 {
152 unsafe { NonZeroU64::new_unchecked(30) }
153}
154
155impl MemoryConfig {
156 pub(super) async fn get_or_build_memory(
157 &self,
158 prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
159 ) -> Memory {
160 let mut boxed_memory = self.memory.lock().await;
161 *boxed_memory
162 .get_or_insert_with(|| {
163 if let Some(prev) = prev_state {
164 Box::new(Memory::from_previous_state(self.clone(), prev))
165 } else {
166 Box::new(Memory::new(self.clone()))
167 }
168 })
169 .clone()
170 }
171}
172
173impl EnrichmentTableConfig for MemoryConfig {
174 async fn build(
175 &self,
176 _globals: &crate::config::GlobalOptions,
177 prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
178 ) -> crate::Result<Box<dyn Table + Send + Sync>> {
179 Ok(Box::new(self.get_or_build_memory(prev_state).await))
180 }
181
182 fn wants_previous_state(&self) -> bool {
183 matches!(self.reload_behavior, ReloadBehavior::PreserveState)
184 }
185
186 fn sink_config(
187 &self,
188 default_key: &ComponentKey,
189 ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
190 Some((default_key.clone(), Box::new(self.clone())))
191 }
192
193 fn source_config(
194 &self,
195 _default_key: &ComponentKey,
196 ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
197 let Some(source_config) = &self.source_config else {
198 return None;
199 };
200 Some((
201 source_config.source_key.clone().into(),
202 Box::new(self.clone()),
203 ))
204 }
205}
206
207#[async_trait]
208#[typetag::serde(name = "memory_enrichment_table")]
209impl SinkConfig for MemoryConfig {
210 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
211 let sink = VectorSink::from_event_streamsink(self.get_or_build_memory(None).await);
212
213 Ok((sink, future::ok(()).boxed()))
214 }
215
216 fn input(&self) -> Input {
217 Input::log()
218 }
219
220 fn acknowledgements(&self) -> &AcknowledgementsConfig {
221 &AcknowledgementsConfig::DEFAULT
222 }
223}
224
225#[async_trait]
226#[typetag::serde(name = "memory_enrichment_table")]
227impl SourceConfig for MemoryConfig {
228 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
229 let memory = self.get_or_build_memory(None).await;
230
231 let log_namespace = cx.log_namespace(self.log_namespace);
232
233 Ok(Box::pin(
234 memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
235 ))
236 }
237
238 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
239 let log_namespace = global_log_namespace.merge(self.log_namespace);
240 let schema_definition = match log_namespace {
241 LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
242 LogNamespace::Vector => {
243 schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
244 .with_meaning(OwnedTargetPath::event_root(), "message")
245 }
246 }
247 .with_standard_vector_source_metadata();
248
249 if self
250 .source_config
251 .as_ref()
252 .map(|c| c.export_expired_items)
253 .unwrap_or_default()
254 {
255 vec![
256 SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()),
257 SourceOutput::new_maybe_logs(DataType::Log, schema_definition)
258 .with_port(EXPIRED_ROUTE),
259 ]
260 } else {
261 vec![SourceOutput::new_maybe_logs(
262 DataType::Log,
263 schema_definition,
264 )]
265 }
266 }
267
268 fn can_acknowledge(&self) -> bool {
269 false
270 }
271}
272
273impl std::fmt::Debug for MemoryConfig {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 f.debug_struct("MemoryConfig")
276 .field("ttl", &self.ttl)
277 .field("scan_interval", &self.scan_interval)
278 .field("flush_interval", &self.flush_interval)
279 .field("max_byte_size", &self.max_byte_size)
280 .finish()
281 }
282}
283
284impl_generate_config_from_default!(MemoryConfig);