vector/sinks/databricks_zerobus/
config.rs1use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8 prelude::*,
9 util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};
13
14#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(tag = "strategy", rename_all = "snake_case")]
18#[configurable(metadata(
19 docs::enum_tag_description = "The authentication strategy to use for Databricks."
20))]
21pub enum DatabricksAuthentication {
22 #[serde(rename = "oauth")]
24 OAuth {
25 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
27 #[configurable(metadata(docs::examples = "abc123..."))]
28 client_id: SensitiveString,
29
30 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
32 #[configurable(metadata(docs::examples = "secret123..."))]
33 client_secret: SensitiveString,
34 },
35}
36
37impl DatabricksAuthentication {
38 pub fn credentials(&self) -> (&str, &str) {
40 match self {
41 DatabricksAuthentication::OAuth {
42 client_id,
43 client_secret,
44 } => (client_id.inner(), client_secret.inner()),
45 }
46 }
47}
48
49#[configurable_component]
51#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
52#[serde(rename_all = "snake_case")]
53pub enum Compression {
54 #[default]
56 None,
57 Lz4Frame,
59 Zstd,
61}
62
63impl From<Compression> for Option<arrow::ipc::CompressionType> {
64 fn from(value: Compression) -> Self {
65 match value {
66 Compression::None => None,
67 Compression::Lz4Frame => Some(arrow::ipc::CompressionType::LZ4_FRAME),
68 Compression::Zstd => Some(arrow::ipc::CompressionType::ZSTD),
69 }
70 }
71}
72
73#[configurable_component]
78#[derive(Clone, Debug)]
79#[serde(deny_unknown_fields)]
80pub struct ZerobusStreamOptions {
81 #[serde(default = "default_flush_timeout_ms")]
83 #[configurable(metadata(docs::examples = 30000))]
84 pub flush_timeout_ms: u64,
85
86 #[serde(default = "default_server_ack_timeout_ms")]
88 #[configurable(metadata(docs::examples = 60000))]
89 pub server_lack_of_ack_timeout_ms: u64,
90
91 #[configurable(derived)]
93 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
94 pub compression: Compression,
95}
96
97impl Default for ZerobusStreamOptions {
98 fn default() -> Self {
99 Self {
100 flush_timeout_ms: default_flush_timeout_ms(),
101 server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
102 compression: Compression::None,
103 }
104 }
105}
106
107#[configurable_component(sink(
109 "databricks_zerobus",
110 "Stream observability data to Databricks Unity Catalog via Zerobus."
111))]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields)]
114pub struct ZerobusSinkConfig {
115 #[configurable(metadata(
124 docs::examples = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
125 ))]
126 #[configurable(metadata(
127 docs::examples = "https://6543210987654321.zerobus.us-east-1.cloud.databricks.com"
128 ))]
129 pub ingestion_endpoint: String,
130
131 #[configurable(metadata(docs::examples = "main.default.logs"))]
140 #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
141 pub table_name: String,
142
143 #[configurable(metadata(docs::examples = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"))]
152 #[configurable(metadata(docs::examples = "https://dbc-f6e5d4c3-b2a1.cloud.databricks.com"))]
153 pub unity_catalog_endpoint: String,
154
155 #[configurable(derived)]
162 pub auth: DatabricksAuthentication,
163
164 #[serde(default)]
169 #[configurable(metadata(docs::examples = "my-service/1.2"))]
170 pub user_agent: Option<String>,
171
172 #[configurable(derived)]
173 #[serde(default)]
174 pub stream_options: ZerobusStreamOptions,
175
176 #[configurable(derived)]
177 #[serde(default)]
178 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
179
180 #[configurable(derived)]
181 #[serde(default)]
182 pub request: TowerRequestConfig,
183
184 #[configurable(derived)]
185 #[serde(
186 default,
187 deserialize_with = "crate::serde::bool_or_struct",
188 skip_serializing_if = "crate::serde::is_default"
189 )]
190 pub acknowledgements: AcknowledgementsConfig,
191}
192
193impl GenerateConfig for ZerobusSinkConfig {
194 fn generate_config() -> toml::Value {
195 toml::Value::try_from(Self {
196 ingestion_endpoint: "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
197 .to_string(),
198 table_name: "main.default.logs".to_string(),
199 unity_catalog_endpoint: "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com".to_string(),
200 auth: DatabricksAuthentication::OAuth {
201 client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
202 client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
203 },
204 user_agent: None,
205 stream_options: ZerobusStreamOptions::default(),
206 batch: BatchConfig::default(),
207 request: TowerRequestConfig::default(),
208 acknowledgements: AcknowledgementsConfig::default(),
209 })
210 .unwrap()
211 }
212}
213
214#[async_trait::async_trait]
215#[typetag::serde(name = "databricks_zerobus")]
216impl SinkConfig for ZerobusSinkConfig {
217 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
218 self.validate()?;
219
220 let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
221 let healthcheck_service = service.clone();
222
223 let request_limits = self.request.into_settings();
224
225 let sink = ZerobusSink::new(service, request_limits, self.batch)?;
226
227 let healthcheck = async move {
228 healthcheck_service
229 .ensure_stream()
230 .await
231 .map_err(|e| e.into())
232 };
233
234 Ok((
235 VectorSink::from_event_streamsink(sink),
236 Box::pin(healthcheck),
237 ))
238 }
239
240 fn input(&self) -> Input {
241 Input::log()
242 }
243
244 fn acknowledgements(&self) -> &AcknowledgementsConfig {
245 &self.acknowledgements
246 }
247}
248
249impl ZerobusSinkConfig {
250 pub fn validate(&self) -> Result<(), ZerobusSinkError> {
251 if self.ingestion_endpoint.is_empty() {
252 return Err(ZerobusSinkError::ConfigError {
253 message: "ingestion_endpoint cannot be empty".to_string(),
254 });
255 }
256
257 if self.table_name.is_empty() {
258 return Err(ZerobusSinkError::ConfigError {
259 message: "table_name cannot be empty".to_string(),
260 });
261 }
262
263 let parts: Vec<&str> = self.table_name.split('.').collect();
264 if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
265 return Err(ZerobusSinkError::ConfigError {
266 message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
267 .to_string(),
268 });
269 }
270
271 if self.unity_catalog_endpoint.is_empty() {
272 return Err(ZerobusSinkError::ConfigError {
273 message: "unity_catalog_endpoint cannot be empty".to_string(),
274 });
275 }
276
277 match &self.auth {
279 DatabricksAuthentication::OAuth {
280 client_id,
281 client_secret,
282 } => {
283 if client_id.inner().is_empty() {
284 return Err(ZerobusSinkError::ConfigError {
285 message: "OAuth client_id cannot be empty".to_string(),
286 });
287 }
288 if client_secret.inner().is_empty() {
289 return Err(ZerobusSinkError::ConfigError {
290 message: "OAuth client_secret cannot be empty".to_string(),
291 });
292 }
293 }
294 }
295
296 if let Some(max_bytes) = self.batch.max_bytes {
297 if max_bytes > 10_000_000 {
305 return Err(ZerobusSinkError::ConfigError {
306 message: "max_bytes must be less than or equal to 10MB".to_string(),
307 });
308 }
309 }
310
311 Ok(())
312 }
313
314 pub fn user_agent_suffix(&self) -> String {
318 let vector = format!("Vector/{}", crate::vector_version());
319 match self.user_agent.as_deref().filter(|s| !s.is_empty()) {
320 Some(ua) => format!("{vector} {ua}"),
321 None => vector,
322 }
323 }
324}
325
326const fn default_flush_timeout_ms() -> u64 {
328 30000
329}
330
331const fn default_server_ack_timeout_ms() -> u64 {
332 60000
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use vector_lib::sensitive_string::SensitiveString;
339
340 fn create_test_config() -> ZerobusSinkConfig {
341 ZerobusSinkConfig {
342 ingestion_endpoint: "https://test.databricks.com".to_string(),
343 table_name: "test.default.logs".to_string(),
344 unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
345 auth: DatabricksAuthentication::OAuth {
346 client_id: SensitiveString::from("test-client-id".to_string()),
347 client_secret: SensitiveString::from("test-client-secret".to_string()),
348 },
349 user_agent: None,
350 stream_options: ZerobusStreamOptions::default(),
351 batch: Default::default(),
352 request: Default::default(),
353 acknowledgements: Default::default(),
354 }
355 }
356
357 #[test]
358 fn test_config_validation_success() {
359 let config = create_test_config();
360 assert!(config.validate().is_ok());
361 }
362
363 #[test]
364 fn test_config_validation_empty_endpoint() {
365 let mut config = create_test_config();
366 config.ingestion_endpoint = "".to_string();
367
368 let result = config.validate();
369 assert!(result.is_err());
370
371 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
372 message,
373 }) = result
374 {
375 assert!(message.contains("ingestion_endpoint cannot be empty"));
376 } else {
377 panic!("Expected ConfigError for empty ingestion_endpoint");
378 }
379 }
380
381 #[test]
382 fn test_config_validation_empty_table_name() {
383 let mut config = create_test_config();
384 config.table_name = "".to_string();
385
386 let result = config.validate();
387 assert!(result.is_err());
388
389 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
390 message,
391 }) = result
392 {
393 assert!(message.contains("table_name cannot be empty"));
394 } else {
395 panic!("Expected ConfigError for empty table_name");
396 }
397 }
398
399 #[test]
400 fn test_config_validation_invalid_table_name() {
401 let mut config = create_test_config();
402 config.table_name = "invalid_table".to_string(); let result = config.validate();
405 assert!(result.is_err());
406
407 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
408 message,
409 }) = result
410 {
411 assert!(message.contains("catalog.schema.table"));
412 } else {
413 panic!("Expected ConfigError for invalid table_name format");
414 }
415 }
416
417 #[test]
418 fn test_config_validation_table_name_empty_segments() {
419 for bad in [
420 "catalog..table",
421 ".schema.table",
422 "catalog.schema.",
423 "..",
424 "catalog.schema.table.extra",
425 ] {
426 let mut config = create_test_config();
427 config.table_name = bad.to_string();
428 let result = config.validate();
429 assert!(result.is_err(), "expected error for table_name={bad:?}");
430 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
431 message,
432 }) = result
433 {
434 assert!(message.contains("catalog.schema.table"));
435 } else {
436 panic!("Expected ConfigError for table_name={bad:?}");
437 }
438 }
439 }
440
441 #[test]
442 fn test_config_validation_empty_unity_catalog_endpoint() {
443 let mut config = create_test_config();
444 config.unity_catalog_endpoint = "".to_string();
445
446 let result = config.validate();
447 assert!(result.is_err());
448
449 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
450 message,
451 }) = result
452 {
453 assert!(message.contains("unity_catalog_endpoint cannot be empty"));
454 } else {
455 panic!("Expected ConfigError for empty unity_catalog_endpoint");
456 }
457 }
458
459 #[test]
460 fn test_config_validation_empty_oauth_credentials() {
461 let mut config = create_test_config();
462 config.auth = DatabricksAuthentication::OAuth {
463 client_id: SensitiveString::from("".to_string()),
464 client_secret: SensitiveString::from("test-secret".to_string()),
465 };
466
467 let result = config.validate();
468 assert!(result.is_err());
469
470 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
471 message,
472 }) = result
473 {
474 assert!(message.contains("OAuth client_id cannot be empty"));
475 } else {
476 panic!("Expected ConfigError for empty OAuth client_id");
477 }
478 }
479
480 #[test]
481 fn test_stream_options_compression_deserializes() {
482 let opts: ZerobusStreamOptions =
483 serde_json::from_str(r#"{"compression":"zstd"}"#).expect("should parse zstd");
484 assert_eq!(opts.compression, Compression::Zstd);
485
486 let opts: ZerobusStreamOptions =
487 serde_json::from_str(r#"{"compression":"lz4_frame"}"#).expect("should parse lz4_frame");
488 assert_eq!(opts.compression, Compression::Lz4Frame);
489
490 let opts: ZerobusStreamOptions =
491 serde_json::from_str(r#"{"compression":"none"}"#).expect("should parse none");
492 assert_eq!(opts.compression, Compression::None);
493
494 let opts: ZerobusStreamOptions = serde_json::from_str("{}").expect("should parse empty");
496 assert_eq!(opts.compression, Compression::None);
497 }
498
499 #[test]
500 fn test_compression_maps_to_arrow_ipc() {
501 assert_eq!(
502 Option::<arrow::ipc::CompressionType>::from(Compression::None),
503 None,
504 );
505 assert_eq!(
506 Option::<arrow::ipc::CompressionType>::from(Compression::Lz4Frame),
507 Some(arrow::ipc::CompressionType::LZ4_FRAME),
508 );
509 assert_eq!(
510 Option::<arrow::ipc::CompressionType>::from(Compression::Zstd),
511 Some(arrow::ipc::CompressionType::ZSTD),
512 );
513 }
514
515 #[test]
519 fn test_arrow_ipc_compression_codecs_are_enabled() {
520 use std::sync::Arc;
521
522 use arrow::array::Int32Array;
523 use arrow::datatypes::{DataType, Field, Schema};
524 use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
525 use arrow::record_batch::RecordBatch;
526
527 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
528 let batch = RecordBatch::try_new(
529 Arc::clone(&schema),
530 vec![Arc::new(Int32Array::from((0..1024).collect::<Vec<_>>()))],
531 )
532 .expect("batch should build");
533
534 for codec in [Compression::Lz4Frame, Compression::Zstd] {
535 let compression: Option<arrow::ipc::CompressionType> = codec.into();
536 let options = IpcWriteOptions::default()
537 .try_with_compression(compression)
538 .unwrap_or_else(|e| panic!("{codec:?} not enabled in arrow build: {e}"));
539
540 let mut buf = Vec::new();
541 let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, options)
542 .unwrap_or_else(|e| panic!("writer for {codec:?} should build: {e}"));
543 writer
544 .write(&batch)
545 .unwrap_or_else(|e| panic!("writing compressed batch for {codec:?} failed: {e}"));
546 writer
547 .finish()
548 .unwrap_or_else(|e| panic!("finishing stream for {codec:?} failed: {e}"));
549
550 assert!(!buf.is_empty(), "{codec:?} produced no output");
551 }
552 }
553
554 #[test]
560 fn test_batch_max_bytes_none_defaults_to_10mb() {
561 let mut config = create_test_config();
562 config.batch.max_bytes = None;
563
564 let settings = config
565 .batch
566 .into_batcher_settings()
567 .expect("batch settings should build");
568
569 assert_eq!(settings.size_limit, 10_000_000);
570 }
571
572 #[test]
573 fn test_user_agent_suffix_without_user_value() {
574 let config = create_test_config();
575 let suffix = config.user_agent_suffix();
576 assert!(
577 suffix.starts_with("Vector/"),
578 "expected Vector/<version> prefix, got {suffix:?}"
579 );
580 assert!(
582 !suffix.contains(' '),
583 "unexpected appended value in {suffix:?}"
584 );
585 }
586
587 #[test]
588 fn test_user_agent_suffix_with_user_value() {
589 let mut config = create_test_config();
590 config.user_agent = Some("my-service/1.2".to_string());
591 let suffix = config.user_agent_suffix();
592 assert!(
593 suffix.starts_with("Vector/"),
594 "expected Vector/<version> prefix, got {suffix:?}"
595 );
596 assert!(
597 suffix.ends_with(" my-service/1.2"),
598 "expected user value appended, got {suffix:?}"
599 );
600 }
601
602 #[test]
603 fn test_user_agent_suffix_empty_user_value_ignored() {
604 let mut config = create_test_config();
605 config.user_agent = Some(String::new());
606 let suffix = config.user_agent_suffix();
607 assert!(
609 !suffix.contains(' '),
610 "empty user_agent should be ignored, got {suffix:?}"
611 );
612 }
613}