Skip to main content

vector/sinks/databend/
config.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use databend_client::APIClient as DatabendAPIClient;
4use futures::future::FutureExt;
5use tower::ServiceBuilder;
6use vector_lib::{
7    codecs::encoding::{Framer, FramingConfig},
8    configurable::{component::GenerateConfig, configurable_component},
9};
10
11use super::{
12    compression::DatabendCompression,
13    encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
14    request_builder::DatabendRequestBuilder,
15    service::{DatabendRetryLogic, DatabendService},
16    sink::DatabendSink,
17};
18use crate::{
19    codecs::{Encoder, EncodingConfig},
20    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
21    http::{Auth, MaybeAuth},
22    sinks::{
23        Healthcheck, VectorSink,
24        util::{
25            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
26            TowerRequestConfig, UriSerde,
27        },
28    },
29    tls::TlsConfig,
30    vector_version,
31};
32
33/// Configuration for the `databend` sink.
34#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38    /// The DSN of the Databend server.
39    #[configurable(metadata(
40        docs::examples = "databend://localhost:8000/default?sslmode=disable"
41    ))]
42    pub endpoint: UriSerde,
43
44    /// The TLS configuration to use when connecting to the Databend server.
45    #[configurable(
46        deprecated = "This option has been deprecated, use arguments in the DSN instead."
47    )]
48    pub tls: Option<TlsConfig>,
49
50    /// The database that contains the table that data is inserted into. Overrides the database in DSN.
51    #[configurable(metadata(docs::examples = "mydatabase"))]
52    pub database: Option<String>,
53
54    /// The username and password to authenticate with. Overrides the username and password in DSN.
55    #[configurable(derived)]
56    pub auth: Option<Auth>,
57
58    /// The table that data is inserted into.
59    #[configurable(metadata(docs::examples = "mytable"))]
60    pub table: String,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub missing_field_as: DatabendMissingFieldAS,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub encoding: DatabendEncodingConfig,
69
70    #[configurable(derived)]
71    #[serde(default)]
72    pub compression: DatabendCompression,
73
74    #[configurable(derived)]
75    #[serde(default)]
76    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub request: TowerRequestConfig,
81
82    #[configurable(derived)]
83    #[serde(
84        default,
85        deserialize_with = "crate::serde::bool_or_struct",
86        skip_serializing_if = "crate::serde::is_default"
87    )]
88    pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92    fn generate_config() -> toml::Value {
93        toml::from_str(
94            r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95            table = "default"
96        "#,
97        )
98        .unwrap()
99    }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106        let ua = format!("vector/{}", vector_version());
107        let auth = self.auth.choose_one(&self.endpoint.auth)?;
108        let authority = self
109            .endpoint
110            .uri
111            .authority()
112            .ok_or("Endpoint missing authority")?;
113        let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114            Some("databend") => self.endpoint.to_string(),
115            // for backward compatibility, build DSN from endpoint
116            Some("http") => format!("databend://{authority}/?sslmode=disable"),
117            Some("https") => format!("databend://{authority}"),
118            None => {
119                return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120            }
121            Some(s) => {
122                return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123            }
124        };
125        let mut endpoint = url::Url::parse(&endpoint)?;
126        match auth {
127            Some(Auth::Basic { user, password }) => {
128                // Only fails for host-less URLs, which cannot happen given the scheme validation above.
129                endpoint.set_username(&user).ok();
130                endpoint.set_password(Some(password.inner())).ok();
131            }
132            Some(Auth::Bearer { .. }) => {
133                return Err("Bearer authentication is not supported currently".into());
134            }
135            Some(Auth::Custom { .. }) => {
136                return Err("Custom authentication is not supported currently".into());
137            }
138            None => {}
139            #[cfg(feature = "aws-core")]
140            _ => {}
141        }
142        if let Some(database) = &self.database {
143            endpoint.set_path(&format!("/{database}"));
144        }
145        let endpoint = endpoint.to_string();
146        let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
147        let healthcheck = select_one(health_client).boxed();
148
149        let request_settings = self.request.into_settings();
150        let batch_settings = self.batch.into_batcher_settings()?;
151
152        let mut file_format_options = BTreeMap::new();
153        let compression = match self.compression {
154            DatabendCompression::Gzip => {
155                file_format_options.insert("compression", "GZIP");
156                Compression::gzip_default()
157            }
158            DatabendCompression::None => {
159                file_format_options.insert("compression", "NONE");
160                Compression::None
161            }
162        };
163        let encoding: EncodingConfig = self.encoding.clone().into();
164        let serializer = match self.encoding.config() {
165            DatabendSerializerConfig::Json(_) => {
166                file_format_options.insert("type", "NDJSON");
167                file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
168                encoding.build()?
169            }
170            DatabendSerializerConfig::Csv(_) => {
171                file_format_options.insert("type", "CSV");
172                file_format_options.insert("field_delimiter", ",");
173                file_format_options.insert("record_delimiter", "\n");
174                file_format_options.insert("skip_header", "0");
175                encoding.build()?
176            }
177        };
178        let framer = FramingConfig::NewlineDelimited.build();
179        let transformer = encoding.transformer();
180
181        let mut copy_options = BTreeMap::new();
182        copy_options.insert("purge", "true");
183
184        let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
185        let service = DatabendService::new(
186            client,
187            self.table.clone(),
188            file_format_options,
189            copy_options,
190        )?;
191        let service = ServiceBuilder::new()
192            .settings(request_settings, DatabendRetryLogic)
193            .service(service);
194
195        let encoder = Encoder::<Framer>::new(framer, serializer);
196        let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
197
198        let sink = DatabendSink::new(batch_settings, request_builder, service);
199
200        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
201    }
202
203    fn input(&self) -> Input {
204        Input::log()
205    }
206
207    fn acknowledgements(&self) -> &AcknowledgementsConfig {
208        &self.acknowledgements
209    }
210}
211
212async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
213    client.query_all("SELECT 1").await?;
214    Ok(())
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn generate_config() {
223        crate::test_util::test_generate_config::<DatabendConfig>();
224    }
225
226    #[test]
227    fn parse_config() {
228        let cfg = serde_yaml::from_str::<DatabendConfig>(indoc::indoc! {r#"
229            endpoint: "databend://localhost:8000/mydatabase?sslmode=disable"
230            table: "mytable"
231        "#})
232        .unwrap();
233        assert_eq!(
234            cfg.endpoint.uri,
235            "databend://localhost:8000/mydatabase?sslmode=disable"
236        );
237        assert_eq!(cfg.table, "mytable");
238        assert!(matches!(
239            cfg.encoding.config(),
240            DatabendSerializerConfig::Json(_)
241        ));
242        assert!(matches!(cfg.compression, DatabendCompression::None));
243    }
244
245    #[test]
246    fn parse_config_with_encoding_compression() {
247        let cfg = serde_yaml::from_str::<DatabendConfig>(indoc::indoc! {r#"
248            endpoint: "databend://localhost:8000/mydatabase?sslmode=disable"
249            table: "mytable"
250            encoding:
251              codec: "csv"
252              csv:
253                fields:
254                  - "host"
255                  - "timestamp"
256                  - "message"
257            compression: "gzip"
258        "#})
259        .unwrap();
260        assert_eq!(
261            cfg.endpoint.uri,
262            "databend://localhost:8000/mydatabase?sslmode=disable"
263        );
264        assert_eq!(cfg.table, "mytable");
265        assert!(matches!(
266            cfg.encoding.config(),
267            DatabendSerializerConfig::Csv(_)
268        ));
269        assert!(matches!(cfg.compression, DatabendCompression::Gzip));
270    }
271}