1use std::{collections::BTreeMap, sync::Arc};
2
3use databend_client::APIClient as DatabendAPIClient;
4use futures::future::FutureExt;
5use tower::ServiceBuilder;
6use vector_lib::{
7 codecs::encoding::{Framer, FramingConfig},
8 configurable::{component::GenerateConfig, configurable_component},
9};
10
11use super::{
12 compression::DatabendCompression,
13 encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
14 request_builder::DatabendRequestBuilder,
15 service::{DatabendRetryLogic, DatabendService},
16 sink::DatabendSink,
17};
18use crate::{
19 codecs::{Encoder, EncodingConfig},
20 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
21 http::{Auth, MaybeAuth},
22 sinks::{
23 Healthcheck, VectorSink,
24 util::{
25 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
26 TowerRequestConfig, UriSerde,
27 },
28 },
29 tls::TlsConfig,
30 vector_version,
31};
32
33#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38 #[configurable(metadata(
40 docs::examples = "databend://localhost:8000/default?sslmode=disable"
41 ))]
42 pub endpoint: UriSerde,
43
44 #[configurable(
46 deprecated = "This option has been deprecated, use arguments in the DSN instead."
47 )]
48 pub tls: Option<TlsConfig>,
49
50 #[configurable(metadata(docs::examples = "mydatabase"))]
52 pub database: Option<String>,
53
54 #[configurable(derived)]
56 pub auth: Option<Auth>,
57
58 #[configurable(metadata(docs::examples = "mytable"))]
60 pub table: String,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub missing_field_as: DatabendMissingFieldAS,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub encoding: DatabendEncodingConfig,
69
70 #[configurable(derived)]
71 #[serde(default)]
72 pub compression: DatabendCompression,
73
74 #[configurable(derived)]
75 #[serde(default)]
76 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub request: TowerRequestConfig,
81
82 #[configurable(derived)]
83 #[serde(
84 default,
85 deserialize_with = "crate::serde::bool_or_struct",
86 skip_serializing_if = "crate::serde::is_default"
87 )]
88 pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92 fn generate_config() -> toml::Value {
93 toml::from_str(
94 r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95 table = "default"
96 "#,
97 )
98 .unwrap()
99 }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106 let ua = format!("vector/{}", vector_version());
107 let auth = self.auth.choose_one(&self.endpoint.auth)?;
108 let authority = self
109 .endpoint
110 .uri
111 .authority()
112 .ok_or("Endpoint missing authority")?;
113 let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114 Some("databend") => self.endpoint.to_string(),
115 Some("http") => format!("databend://{authority}/?sslmode=disable"),
117 Some("https") => format!("databend://{authority}"),
118 None => {
119 return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120 }
121 Some(s) => {
122 return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123 }
124 };
125 let mut endpoint = url::Url::parse(&endpoint)?;
126 match auth {
127 Some(Auth::Basic { user, password }) => {
128 endpoint.set_username(&user).ok();
130 endpoint.set_password(Some(password.inner())).ok();
131 }
132 Some(Auth::Bearer { .. }) => {
133 return Err("Bearer authentication is not supported currently".into());
134 }
135 Some(Auth::Custom { .. }) => {
136 return Err("Custom authentication is not supported currently".into());
137 }
138 None => {}
139 #[cfg(feature = "aws-core")]
140 _ => {}
141 }
142 if let Some(database) = &self.database {
143 endpoint.set_path(&format!("/{database}"));
144 }
145 let endpoint = endpoint.to_string();
146 let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
147 let healthcheck = select_one(health_client).boxed();
148
149 let request_settings = self.request.into_settings();
150 let batch_settings = self.batch.into_batcher_settings()?;
151
152 let mut file_format_options = BTreeMap::new();
153 let compression = match self.compression {
154 DatabendCompression::Gzip => {
155 file_format_options.insert("compression", "GZIP");
156 Compression::gzip_default()
157 }
158 DatabendCompression::None => {
159 file_format_options.insert("compression", "NONE");
160 Compression::None
161 }
162 };
163 let encoding: EncodingConfig = self.encoding.clone().into();
164 let serializer = match self.encoding.config() {
165 DatabendSerializerConfig::Json(_) => {
166 file_format_options.insert("type", "NDJSON");
167 file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
168 encoding.build()?
169 }
170 DatabendSerializerConfig::Csv(_) => {
171 file_format_options.insert("type", "CSV");
172 file_format_options.insert("field_delimiter", ",");
173 file_format_options.insert("record_delimiter", "\n");
174 file_format_options.insert("skip_header", "0");
175 encoding.build()?
176 }
177 };
178 let framer = FramingConfig::NewlineDelimited.build();
179 let transformer = encoding.transformer();
180
181 let mut copy_options = BTreeMap::new();
182 copy_options.insert("purge", "true");
183
184 let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
185 let service = DatabendService::new(
186 client,
187 self.table.clone(),
188 file_format_options,
189 copy_options,
190 )?;
191 let service = ServiceBuilder::new()
192 .settings(request_settings, DatabendRetryLogic)
193 .service(service);
194
195 let encoder = Encoder::<Framer>::new(framer, serializer);
196 let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
197
198 let sink = DatabendSink::new(batch_settings, request_builder, service);
199
200 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
201 }
202
203 fn input(&self) -> Input {
204 Input::log()
205 }
206
207 fn acknowledgements(&self) -> &AcknowledgementsConfig {
208 &self.acknowledgements
209 }
210}
211
212async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
213 client.query_all("SELECT 1").await?;
214 Ok(())
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn generate_config() {
223 crate::test_util::test_generate_config::<DatabendConfig>();
224 }
225
226 #[test]
227 fn parse_config() {
228 let cfg = serde_yaml::from_str::<DatabendConfig>(indoc::indoc! {r#"
229 endpoint: "databend://localhost:8000/mydatabase?sslmode=disable"
230 table: "mytable"
231 "#})
232 .unwrap();
233 assert_eq!(
234 cfg.endpoint.uri,
235 "databend://localhost:8000/mydatabase?sslmode=disable"
236 );
237 assert_eq!(cfg.table, "mytable");
238 assert!(matches!(
239 cfg.encoding.config(),
240 DatabendSerializerConfig::Json(_)
241 ));
242 assert!(matches!(cfg.compression, DatabendCompression::None));
243 }
244
245 #[test]
246 fn parse_config_with_encoding_compression() {
247 let cfg = serde_yaml::from_str::<DatabendConfig>(indoc::indoc! {r#"
248 endpoint: "databend://localhost:8000/mydatabase?sslmode=disable"
249 table: "mytable"
250 encoding:
251 codec: "csv"
252 csv:
253 fields:
254 - "host"
255 - "timestamp"
256 - "message"
257 compression: "gzip"
258 "#})
259 .unwrap();
260 assert_eq!(
261 cfg.endpoint.uri,
262 "databend://localhost:8000/mydatabase?sslmode=disable"
263 );
264 assert_eq!(cfg.table, "mytable");
265 assert!(matches!(
266 cfg.encoding.config(),
267 DatabendSerializerConfig::Csv(_)
268 ));
269 assert!(matches!(cfg.compression, DatabendCompression::Gzip));
270 }
271}