Skip to main content

vector/sinks/databricks_zerobus/
config.rs

1//! Configuration for the Zerobus sink.
2
3use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8    prelude::*,
9    util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};
13
14/// Authentication configuration for Databricks.
15#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(tag = "strategy", rename_all = "snake_case")]
18#[configurable(metadata(
19    docs::enum_tag_description = "The authentication strategy to use for Databricks."
20))]
21pub enum DatabricksAuthentication {
22    /// Authenticate using OAuth 2.0 client credentials.
23    #[serde(rename = "oauth")]
24    OAuth {
25        /// OAuth 2.0 client ID.
26        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
27        #[configurable(metadata(docs::examples = "abc123..."))]
28        client_id: SensitiveString,
29
30        /// OAuth 2.0 client secret.
31        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
32        #[configurable(metadata(docs::examples = "secret123..."))]
33        client_secret: SensitiveString,
34    },
35}
36
37impl DatabricksAuthentication {
38    /// Extract the client ID and client secret as string references.
39    pub fn credentials(&self) -> (&str, &str) {
40        match self {
41            DatabricksAuthentication::OAuth {
42                client_id,
43                client_secret,
44            } => (client_id.inner(), client_secret.inner()),
45        }
46    }
47}
48
49/// Arrow IPC compression codec for Zerobus Arrow Flight payloads.
50#[configurable_component]
51#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
52#[serde(rename_all = "snake_case")]
53pub enum Compression {
54    /// No compression.
55    #[default]
56    None,
57    /// LZ4 frame compression.
58    Lz4Frame,
59    /// Zstandard compression.
60    Zstd,
61}
62
63impl From<Compression> for Option<arrow::ipc::CompressionType> {
64    fn from(value: Compression) -> Self {
65        match value {
66            Compression::None => None,
67            Compression::Lz4Frame => Some(arrow::ipc::CompressionType::LZ4_FRAME),
68            Compression::Zstd => Some(arrow::ipc::CompressionType::ZSTD),
69        }
70    }
71}
72
73/// Zerobus stream configuration options.
74///
75/// This is a thin wrapper around the SDK's `StreamConfigurationOptions` with Vector-specific
76/// configuration attributes and custom defaults suitable for Vector's use case.
77#[configurable_component]
78#[derive(Clone, Debug)]
79#[serde(deny_unknown_fields)]
80pub struct ZerobusStreamOptions {
81    /// Timeout in milliseconds for flush operations.
82    #[serde(default = "default_flush_timeout_ms")]
83    #[configurable(metadata(docs::examples = 30000))]
84    pub flush_timeout_ms: u64,
85
86    /// Timeout in milliseconds for server acknowledgements.
87    #[serde(default = "default_server_ack_timeout_ms")]
88    #[configurable(metadata(docs::examples = 60000))]
89    pub server_lack_of_ack_timeout_ms: u64,
90
91    /// Arrow IPC compression for Flight payloads. Defaults to no compression.
92    #[configurable(derived)]
93    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
94    pub compression: Compression,
95}
96
97impl Default for ZerobusStreamOptions {
98    fn default() -> Self {
99        Self {
100            flush_timeout_ms: default_flush_timeout_ms(),
101            server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
102            compression: Compression::None,
103        }
104    }
105}
106
107/// Configuration for the Databricks Zerobus sink.
108#[configurable_component(sink(
109    "databricks_zerobus",
110    "Stream observability data to Databricks Unity Catalog via Zerobus."
111))]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields)]
114pub struct ZerobusSinkConfig {
115    /// The Zerobus ingestion endpoint URL.
116    ///
117    /// This should be the full URL to the Zerobus ingestion service.
118    ///
119    /// See the [Databricks Zerobus documentation][zerobus_endpoint] to find your workspace URL and
120    /// Zerobus ingest endpoint.
121    ///
122    /// [zerobus_endpoint]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#get-your-workspace-url-and-zerobus-ingest-endpoint
123    #[configurable(metadata(
124        docs::examples = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
125    ))]
126    #[configurable(metadata(
127        docs::examples = "https://6543210987654321.zerobus.us-east-1.cloud.databricks.com"
128    ))]
129    pub ingestion_endpoint: String,
130
131    /// The Unity Catalog table name to write to.
132    ///
133    /// This should be in the format `catalog.schema.table`.
134    ///
135    /// See the [Databricks Zerobus documentation][zerobus_table] to create or identify the target
136    /// table.
137    ///
138    /// [zerobus_table]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#create-or-identify-the-target-table
139    #[configurable(metadata(docs::examples = "main.default.logs"))]
140    #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
141    pub table_name: String,
142
143    /// The Unity Catalog endpoint URL.
144    ///
145    /// This is used for authentication and table metadata.
146    ///
147    /// See the [Databricks Zerobus documentation][zerobus_endpoint] to find your workspace URL and
148    /// Zerobus ingest endpoint.
149    ///
150    /// [zerobus_endpoint]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#get-your-workspace-url-and-zerobus-ingest-endpoint
151    #[configurable(metadata(docs::examples = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"))]
152    #[configurable(metadata(docs::examples = "https://dbc-f6e5d4c3-b2a1.cloud.databricks.com"))]
153    pub unity_catalog_endpoint: String,
154
155    /// Databricks authentication configuration.
156    ///
157    /// See the [Databricks Zerobus documentation][zerobus_service_principal] to create a service
158    /// principal and grant it permissions to write to the target table.
159    ///
160    /// [zerobus_service_principal]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#create-a-service-principal-and-grant-permissions
161    #[configurable(derived)]
162    pub auth: DatabricksAuthentication,
163
164    /// Custom identifier appended to the `user-agent` header sent to Databricks.
165    ///
166    /// The header always includes `Vector/<version>`; when set, this value is
167    /// appended after it (e.g. `my-service/1.2`).
168    #[serde(default)]
169    #[configurable(metadata(docs::examples = "my-service/1.2"))]
170    pub user_agent: Option<String>,
171
172    #[configurable(derived)]
173    #[serde(default)]
174    pub stream_options: ZerobusStreamOptions,
175
176    #[configurable(derived)]
177    #[serde(default)]
178    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
179
180    #[configurable(derived)]
181    #[serde(default)]
182    pub request: TowerRequestConfig,
183
184    #[configurable(derived)]
185    #[serde(
186        default,
187        deserialize_with = "crate::serde::bool_or_struct",
188        skip_serializing_if = "crate::serde::is_default"
189    )]
190    pub acknowledgements: AcknowledgementsConfig,
191}
192
193impl GenerateConfig for ZerobusSinkConfig {
194    fn generate_config() -> toml::Value {
195        toml::Value::try_from(Self {
196            ingestion_endpoint: "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
197                .to_string(),
198            table_name: "main.default.logs".to_string(),
199            unity_catalog_endpoint: "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com".to_string(),
200            auth: DatabricksAuthentication::OAuth {
201                client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
202                client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
203            },
204            user_agent: None,
205            stream_options: ZerobusStreamOptions::default(),
206            batch: BatchConfig::default(),
207            request: TowerRequestConfig::default(),
208            acknowledgements: AcknowledgementsConfig::default(),
209        })
210        .unwrap()
211    }
212}
213
214#[async_trait::async_trait]
215#[typetag::serde(name = "databricks_zerobus")]
216impl SinkConfig for ZerobusSinkConfig {
217    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
218        self.validate()?;
219
220        let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
221        let healthcheck_service = service.clone();
222
223        let request_limits = self.request.into_settings();
224
225        let sink = ZerobusSink::new(service, request_limits, self.batch)?;
226
227        let healthcheck = async move {
228            healthcheck_service
229                .ensure_stream()
230                .await
231                .map_err(|e| e.into())
232        };
233
234        Ok((
235            VectorSink::from_event_streamsink(sink),
236            Box::pin(healthcheck),
237        ))
238    }
239
240    fn input(&self) -> Input {
241        Input::log()
242    }
243
244    fn acknowledgements(&self) -> &AcknowledgementsConfig {
245        &self.acknowledgements
246    }
247}
248
249impl ZerobusSinkConfig {
250    pub fn validate(&self) -> Result<(), ZerobusSinkError> {
251        if self.ingestion_endpoint.is_empty() {
252            return Err(ZerobusSinkError::ConfigError {
253                message: "ingestion_endpoint cannot be empty".to_string(),
254            });
255        }
256
257        if self.table_name.is_empty() {
258            return Err(ZerobusSinkError::ConfigError {
259                message: "table_name cannot be empty".to_string(),
260            });
261        }
262
263        let parts: Vec<&str> = self.table_name.split('.').collect();
264        if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
265            return Err(ZerobusSinkError::ConfigError {
266                message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
267                    .to_string(),
268            });
269        }
270
271        if self.unity_catalog_endpoint.is_empty() {
272            return Err(ZerobusSinkError::ConfigError {
273                message: "unity_catalog_endpoint cannot be empty".to_string(),
274            });
275        }
276
277        // Validate authentication credentials
278        match &self.auth {
279            DatabricksAuthentication::OAuth {
280                client_id,
281                client_secret,
282            } => {
283                if client_id.inner().is_empty() {
284                    return Err(ZerobusSinkError::ConfigError {
285                        message: "OAuth client_id cannot be empty".to_string(),
286                    });
287                }
288                if client_secret.inner().is_empty() {
289                    return Err(ZerobusSinkError::ConfigError {
290                        message: "OAuth client_secret cannot be empty".to_string(),
291                    });
292                }
293            }
294        }
295
296        if let Some(max_bytes) = self.batch.max_bytes {
297            // Zerobus SDK limits max bytes to 10MB. This cap is a coarse safety
298            // limit: it's measured against Vector's pre-serialization (estimated
299            // JSON) sizing, not the encoded Arrow bytes the SDK actually sends.
300            // The two differ — for numeric-heavy schemas the encoded Arrow batch
301            // can be larger than the source events — so a batch configured right
302            // at the boundary may still exceed the SDK's limit; lower max_bytes to
303            // leave headroom if you see SDK-side size errors.
304            if max_bytes > 10_000_000 {
305                return Err(ZerobusSinkError::ConfigError {
306                    message: "max_bytes must be less than or equal to 10MB".to_string(),
307                });
308            }
309        }
310
311        Ok(())
312    }
313
314    /// The user-agent suffix to hand the Zerobus SDK: `Vector/<version>`
315    /// alone, or with the user's configured `user_agent` appended. The SDK
316    /// prepends its own `zerobus-sdk-rs/<version>` prefix to this value.
317    pub fn user_agent_suffix(&self) -> String {
318        let vector = format!("Vector/{}", crate::vector_version());
319        match self.user_agent.as_deref().filter(|s| !s.is_empty()) {
320            Some(ua) => format!("{vector} {ua}"),
321            None => vector,
322        }
323    }
324}
325
326// Default value functions
327const fn default_flush_timeout_ms() -> u64 {
328    30000
329}
330
331const fn default_server_ack_timeout_ms() -> u64 {
332    60000
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use vector_lib::sensitive_string::SensitiveString;
339
340    fn create_test_config() -> ZerobusSinkConfig {
341        ZerobusSinkConfig {
342            ingestion_endpoint: "https://test.databricks.com".to_string(),
343            table_name: "test.default.logs".to_string(),
344            unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
345            auth: DatabricksAuthentication::OAuth {
346                client_id: SensitiveString::from("test-client-id".to_string()),
347                client_secret: SensitiveString::from("test-client-secret".to_string()),
348            },
349            user_agent: None,
350            stream_options: ZerobusStreamOptions::default(),
351            batch: Default::default(),
352            request: Default::default(),
353            acknowledgements: Default::default(),
354        }
355    }
356
357    #[test]
358    fn test_config_validation_success() {
359        let config = create_test_config();
360        assert!(config.validate().is_ok());
361    }
362
363    #[test]
364    fn test_config_validation_empty_endpoint() {
365        let mut config = create_test_config();
366        config.ingestion_endpoint = "".to_string();
367
368        let result = config.validate();
369        assert!(result.is_err());
370
371        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
372            message,
373        }) = result
374        {
375            assert!(message.contains("ingestion_endpoint cannot be empty"));
376        } else {
377            panic!("Expected ConfigError for empty ingestion_endpoint");
378        }
379    }
380
381    #[test]
382    fn test_config_validation_empty_table_name() {
383        let mut config = create_test_config();
384        config.table_name = "".to_string();
385
386        let result = config.validate();
387        assert!(result.is_err());
388
389        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
390            message,
391        }) = result
392        {
393            assert!(message.contains("table_name cannot be empty"));
394        } else {
395            panic!("Expected ConfigError for empty table_name");
396        }
397    }
398
399    #[test]
400    fn test_config_validation_invalid_table_name() {
401        let mut config = create_test_config();
402        config.table_name = "invalid_table".to_string(); // Missing dots
403
404        let result = config.validate();
405        assert!(result.is_err());
406
407        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
408            message,
409        }) = result
410        {
411            assert!(message.contains("catalog.schema.table"));
412        } else {
413            panic!("Expected ConfigError for invalid table_name format");
414        }
415    }
416
417    #[test]
418    fn test_config_validation_table_name_empty_segments() {
419        for bad in [
420            "catalog..table",
421            ".schema.table",
422            "catalog.schema.",
423            "..",
424            "catalog.schema.table.extra",
425        ] {
426            let mut config = create_test_config();
427            config.table_name = bad.to_string();
428            let result = config.validate();
429            assert!(result.is_err(), "expected error for table_name={bad:?}");
430            if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
431                message,
432            }) = result
433            {
434                assert!(message.contains("catalog.schema.table"));
435            } else {
436                panic!("Expected ConfigError for table_name={bad:?}");
437            }
438        }
439    }
440
441    #[test]
442    fn test_config_validation_empty_unity_catalog_endpoint() {
443        let mut config = create_test_config();
444        config.unity_catalog_endpoint = "".to_string();
445
446        let result = config.validate();
447        assert!(result.is_err());
448
449        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
450            message,
451        }) = result
452        {
453            assert!(message.contains("unity_catalog_endpoint cannot be empty"));
454        } else {
455            panic!("Expected ConfigError for empty unity_catalog_endpoint");
456        }
457    }
458
459    #[test]
460    fn test_config_validation_empty_oauth_credentials() {
461        let mut config = create_test_config();
462        config.auth = DatabricksAuthentication::OAuth {
463            client_id: SensitiveString::from("".to_string()),
464            client_secret: SensitiveString::from("test-secret".to_string()),
465        };
466
467        let result = config.validate();
468        assert!(result.is_err());
469
470        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
471            message,
472        }) = result
473        {
474            assert!(message.contains("OAuth client_id cannot be empty"));
475        } else {
476            panic!("Expected ConfigError for empty OAuth client_id");
477        }
478    }
479
480    #[test]
481    fn test_stream_options_compression_deserializes() {
482        let opts: ZerobusStreamOptions =
483            serde_json::from_str(r#"{"compression":"zstd"}"#).expect("should parse zstd");
484        assert_eq!(opts.compression, Compression::Zstd);
485
486        let opts: ZerobusStreamOptions =
487            serde_json::from_str(r#"{"compression":"lz4_frame"}"#).expect("should parse lz4_frame");
488        assert_eq!(opts.compression, Compression::Lz4Frame);
489
490        let opts: ZerobusStreamOptions =
491            serde_json::from_str(r#"{"compression":"none"}"#).expect("should parse none");
492        assert_eq!(opts.compression, Compression::None);
493
494        // Omitting the field leaves compression disabled.
495        let opts: ZerobusStreamOptions = serde_json::from_str("{}").expect("should parse empty");
496        assert_eq!(opts.compression, Compression::None);
497    }
498
499    #[test]
500    fn test_compression_maps_to_arrow_ipc() {
501        assert_eq!(
502            Option::<arrow::ipc::CompressionType>::from(Compression::None),
503            None,
504        );
505        assert_eq!(
506            Option::<arrow::ipc::CompressionType>::from(Compression::Lz4Frame),
507            Some(arrow::ipc::CompressionType::LZ4_FRAME),
508        );
509        assert_eq!(
510            Option::<arrow::ipc::CompressionType>::from(Compression::Zstd),
511            Some(arrow::ipc::CompressionType::ZSTD),
512        );
513    }
514
515    /// Guards the `arrow/ipc_compression` feature: lz4/zstd error at runtime unless
516    /// arrow is built with the codecs. arrow-ipc only validates when writing a
517    /// compressed buffer, so this round-trips a batch through each codec.
518    #[test]
519    fn test_arrow_ipc_compression_codecs_are_enabled() {
520        use std::sync::Arc;
521
522        use arrow::array::Int32Array;
523        use arrow::datatypes::{DataType, Field, Schema};
524        use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
525        use arrow::record_batch::RecordBatch;
526
527        let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
528        let batch = RecordBatch::try_new(
529            Arc::clone(&schema),
530            vec![Arc::new(Int32Array::from((0..1024).collect::<Vec<_>>()))],
531        )
532        .expect("batch should build");
533
534        for codec in [Compression::Lz4Frame, Compression::Zstd] {
535            let compression: Option<arrow::ipc::CompressionType> = codec.into();
536            let options = IpcWriteOptions::default()
537                .try_with_compression(compression)
538                .unwrap_or_else(|e| panic!("{codec:?} not enabled in arrow build: {e}"));
539
540            let mut buf = Vec::new();
541            let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, options)
542                .unwrap_or_else(|e| panic!("writer for {codec:?} should build: {e}"));
543            writer
544                .write(&batch)
545                .unwrap_or_else(|e| panic!("writing compressed batch for {codec:?} failed: {e}"));
546            writer
547                .finish()
548                .unwrap_or_else(|e| panic!("finishing stream for {codec:?} failed: {e}"));
549
550            assert!(!buf.is_empty(), "{codec:?} produced no output");
551        }
552    }
553
554    /// When `batch.max_bytes` is `None` (user omitted the field or set it to `null`),
555    /// `into_batcher_settings()` must merge it against
556    /// `RealtimeSizeBasedDefaultBatchSettings::MAX_BYTES` (10MB) — never unbounded.
557    /// This guarantees the Zerobus SDK's 10MB limit cannot be exceeded at runtime
558    /// even without an explicit user cap.
559    #[test]
560    fn test_batch_max_bytes_none_defaults_to_10mb() {
561        let mut config = create_test_config();
562        config.batch.max_bytes = None;
563
564        let settings = config
565            .batch
566            .into_batcher_settings()
567            .expect("batch settings should build");
568
569        assert_eq!(settings.size_limit, 10_000_000);
570    }
571
572    #[test]
573    fn test_user_agent_suffix_without_user_value() {
574        let config = create_test_config();
575        let suffix = config.user_agent_suffix();
576        assert!(
577            suffix.starts_with("Vector/"),
578            "expected Vector/<version> prefix, got {suffix:?}"
579        );
580        // No user value configured, so nothing is appended.
581        assert!(
582            !suffix.contains(' '),
583            "unexpected appended value in {suffix:?}"
584        );
585    }
586
587    #[test]
588    fn test_user_agent_suffix_with_user_value() {
589        let mut config = create_test_config();
590        config.user_agent = Some("my-service/1.2".to_string());
591        let suffix = config.user_agent_suffix();
592        assert!(
593            suffix.starts_with("Vector/"),
594            "expected Vector/<version> prefix, got {suffix:?}"
595        );
596        assert!(
597            suffix.ends_with(" my-service/1.2"),
598            "expected user value appended, got {suffix:?}"
599        );
600    }
601
602    #[test]
603    fn test_user_agent_suffix_empty_user_value_ignored() {
604        let mut config = create_test_config();
605        config.user_agent = Some(String::new());
606        let suffix = config.user_agent_suffix();
607        // An empty string is treated the same as no value: no trailing space.
608        assert!(
609            !suffix.contains(' '),
610            "empty user_agent should be ignored, got {suffix:?}"
611        );
612    }
613}