1use vector_lib::{
2 codecs::{
3 MetricTagValues,
4 encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5 },
6 configurable::configurable_component,
7 sensitive_string::SensitiveString,
8};
9
10use crate::{
11 codecs::{EncodingConfigWithFraming, Transformer},
12 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13 http::Auth as HttpAuthConfig,
14 sinks::{
15 Healthcheck, VectorSink,
16 http::config::{HttpMethod, HttpSinkConfig},
17 util::{
18 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
19 http::{RequestConfig, RetryStrategy},
20 },
21 },
22 tls::TlsConfig,
23};
24
25static CLOUD_URL: &str = "https://api.axiom.co";
26
27#[configurable_component]
29#[derive(Clone, Debug, Default)]
30#[serde(default)]
31pub struct UrlOrRegion {
32 #[configurable(validation(format = "uri"))]
38 #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
39 #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
40 #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
41 pub url: Option<String>,
42
43 #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
49 #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
50 #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
51 pub region: Option<String>,
52}
53
54impl UrlOrRegion {
55 fn validate(&self) -> crate::Result<()> {
57 if self.url.is_some() && self.region.is_some() {
58 return Err("Cannot set both `url` and `region`. Please use only one.".into());
59 }
60 Ok(())
61 }
62
63 pub fn url(&self) -> Option<&str> {
65 self.url.as_deref()
66 }
67
68 pub fn region(&self) -> Option<&str> {
70 self.region.as_deref()
71 }
72}
73
74#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
76#[derive(Clone, Debug, Default)]
77pub struct AxiomConfig {
78 #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
82 #[configurable(metadata(docs::examples = "123abc"))]
83 pub org_id: Option<String>,
84
85 #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
87 #[configurable(metadata(docs::examples = "123abc"))]
88 pub token: SensitiveString,
89
90 #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
92 #[configurable(metadata(docs::examples = "vector_rocks"))]
93 pub dataset: String,
94
95 #[serde(flatten)]
97 #[configurable(derived)]
98 pub endpoint: UrlOrRegion,
99
100 #[configurable(derived)]
101 #[serde(default)]
102 pub request: RequestConfig,
103
104 #[configurable(derived)]
106 #[serde(default = "Compression::zstd_default")]
107 pub compression: Compression,
108
109 #[configurable(derived)]
113 pub tls: Option<TlsConfig>,
114
115 #[configurable(derived)]
117 #[serde(default)]
118 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
119
120 #[configurable(derived)]
122 #[serde(
123 default,
124 deserialize_with = "crate::serde::bool_or_struct",
125 skip_serializing_if = "crate::serde::is_default"
126 )]
127 pub acknowledgements: AcknowledgementsConfig,
128
129 #[configurable(derived)]
130 #[serde(default)]
131 pub retry_strategy: RetryStrategy,
132}
133
134impl GenerateConfig for AxiomConfig {
135 fn generate_config() -> toml::Value {
136 toml::from_str(
137 r#"token = "${AXIOM_TOKEN}"
138 dataset = "${AXIOM_DATASET}"
139 url = "${AXIOM_URL}"
140 org_id = "${AXIOM_ORG_ID}""#,
141 )
142 .unwrap()
143 }
144}
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "axiom")]
148impl SinkConfig for AxiomConfig {
149 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
150 self.endpoint.validate()?;
152
153 let mut request = self.request.clone();
154 if let Some(org_id) = &self.org_id {
155 request
157 .headers
158 .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
159 }
160
161 let http_sink_config = HttpSinkConfig {
168 uri: self.build_endpoint().try_into()?,
169 compression: self.compression,
170 auth: Some(HttpAuthConfig::Bearer {
171 token: self.token.clone(),
172 }),
173 method: HttpMethod::Post,
174 tls: self.tls.clone(),
175 request,
176 acknowledgements: self.acknowledgements,
177 batch: self.batch,
178 encoding: EncodingConfigWithFraming::new(
179 Some(FramingConfig::NewlineDelimited),
180 SerializerConfig::Json(JsonSerializerConfig {
181 metric_tag_values: MetricTagValues::Single,
182 options: JsonSerializerOptions { pretty: false }, }),
184 Transformer::default(),
185 ),
186 payload_prefix: "".into(), payload_suffix: "".into(), retry_strategy: self.retry_strategy.clone(),
189 };
190
191 http_sink_config.build(cx).await
192 }
193
194 fn input(&self) -> Input {
195 Input::new(DataType::Metric | DataType::Log | DataType::Trace)
196 }
197
198 fn acknowledgements(&self) -> &AcknowledgementsConfig {
199 &self.acknowledgements
200 }
201}
202
203impl AxiomConfig {
204 fn build_endpoint(&self) -> String {
205 if let Some(url) = self.endpoint.url() {
209 let url = url.trim_end_matches('/');
210
211 if let Ok(parsed) = url::Url::parse(url) {
215 let path = parsed.path();
216 if path.is_empty() || path == "/" {
217 return format!("{url}/v1/datasets/{}/ingest", self.dataset);
219 }
220 }
221
222 return url.to_string();
224 }
225
226 if let Some(region) = self.endpoint.region() {
228 let region = region.trim_end_matches('/');
229 return format!("https://{region}/v1/ingest/{}", self.dataset);
230 }
231
232 format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
234 }
235}
236
237#[cfg(test)]
238mod test {
239 #[test]
240 fn generate_config() {
241 crate::test_util::test_generate_config::<super::AxiomConfig>();
242 }
243
244 #[test]
245 fn test_region_domain_only() {
246 let config = super::AxiomConfig {
248 endpoint: super::UrlOrRegion {
249 region: Some("mumbai.axiomdomain.co".to_string()),
250 url: None,
251 },
252 dataset: "test-3".to_string(),
253 ..Default::default()
254 };
255 let endpoint = config.build_endpoint();
256 assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
257 }
258
259 #[test]
260 fn test_default_no_config() {
261 let config = super::AxiomConfig {
263 dataset: "foo".to_string(),
264 ..Default::default()
265 };
266 let endpoint = config.build_endpoint();
267 assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
268 }
269
270 #[test]
271 fn test_url_with_custom_path() {
272 let config = super::AxiomConfig {
274 endpoint: super::UrlOrRegion {
275 url: Some("http://localhost:3400/ingest".to_string()),
276 region: None,
277 },
278 dataset: "meh".to_string(),
279 ..Default::default()
280 };
281 let endpoint = config.build_endpoint();
282 assert_eq!(endpoint, "http://localhost:3400/ingest");
283 }
284
285 #[test]
286 fn test_url_without_path_backwards_compat() {
287 let config = super::AxiomConfig {
289 endpoint: super::UrlOrRegion {
290 url: Some("https://api.eu.axiom.co".to_string()),
291 region: None,
292 },
293 dataset: "qoo".to_string(),
294 ..Default::default()
295 };
296 let endpoint = config.build_endpoint();
297 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
298
299 let config = super::AxiomConfig {
301 endpoint: super::UrlOrRegion {
302 url: Some("https://api.eu.axiom.co/".to_string()),
303 region: None,
304 },
305 dataset: "qoo".to_string(),
306 ..Default::default()
307 };
308 let endpoint = config.build_endpoint();
309 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
310 }
311
312 #[test]
313 fn test_both_url_and_region_fails_validation() {
314 let endpoint = super::UrlOrRegion {
316 url: Some("http://localhost:3400/ingest".to_string()),
317 region: Some("mumbai.axiomdomain.co".to_string()),
318 };
319
320 let result = endpoint.validate();
321 assert!(result.is_err());
322 assert_eq!(
323 result.unwrap_err().to_string(),
324 "Cannot set both `url` and `region`. Please use only one."
325 );
326 }
327
328 #[test]
329 fn test_url_or_region_deserialization_with_url() {
330 let config: super::AxiomConfig = serde_yaml::from_str(indoc::indoc! {r#"
332 token: "test-token"
333 dataset: "test-dataset"
334 url: "https://api.eu.axiom.co"
335 "#})
336 .unwrap();
337
338 assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
339 assert_eq!(config.endpoint.region(), None);
340 }
341
342 #[test]
343 fn test_url_or_region_deserialization_with_region() {
344 let config: super::AxiomConfig = serde_yaml::from_str(indoc::indoc! {r#"
346 token: "test-token"
347 dataset: "test-dataset"
348 region: "mumbai.axiom.co"
349 "#})
350 .unwrap();
351
352 assert_eq!(config.endpoint.url(), None);
353 assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
354 }
355
356 #[test]
357 fn test_production_regional_edges() {
358 let config = super::AxiomConfig {
360 endpoint: super::UrlOrRegion {
361 region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
362 url: None,
363 },
364 dataset: "my-dataset".to_string(),
365 ..Default::default()
366 };
367 let endpoint = config.build_endpoint();
368 assert_eq!(
369 endpoint,
370 "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
371 );
372 }
373
374 #[test]
375 fn test_staging_environment_edges() {
376 let config = super::AxiomConfig {
378 endpoint: super::UrlOrRegion {
379 region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
380 url: None,
381 },
382 dataset: "test-dataset".to_string(),
383 ..Default::default()
384 };
385 let endpoint = config.build_endpoint();
386 assert_eq!(
387 endpoint,
388 "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
389 );
390 }
391
392 #[test]
393 fn test_dev_environment_edges() {
394 let config = super::AxiomConfig {
396 endpoint: super::UrlOrRegion {
397 region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
398 url: None,
399 },
400 dataset: "dev-dataset".to_string(),
401 ..Default::default()
402 };
403 let endpoint = config.build_endpoint();
404 assert_eq!(
405 endpoint,
406 "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
407 );
408 }
409}