1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use tracing::Instrument;
6use vector_lib::{
7 event::{ObjectMap, Value},
8 internal_event::{ComponentEventsDropped, UNINTENTIONAL},
9 lookup::event_path,
10};
11use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
12
13use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
14use crate::{
15 common::datadog::{DD_RESERVED_SEMANTIC_ATTRS, DDTAGS, MESSAGE, is_reserved_attribute},
16 internal_events::DatadogLogsReservedAttributeConflict,
17 sinks::{
18 prelude::*,
19 util::{Compressor, http::HttpJsonBatchSizer},
20 },
21};
22#[derive(Default)]
23pub struct EventPartitioner;
24
25impl Partitioner for EventPartitioner {
26 type Item = Event;
27 type Key = Option<Arc<str>>;
28
29 fn partition(&self, item: &Self::Item) -> Self::Key {
30 item.metadata().datadog_api_key()
31 }
32}
33
34#[derive(Debug)]
35pub struct LogSinkBuilder<S> {
36 transformer: Transformer,
37 service: S,
38 batch_settings: BatcherSettings,
39 compression: Option<Compression>,
40 default_api_key: Arc<str>,
41 protocol: String,
42 conforms_as_agent: bool,
43}
44
45impl<S> LogSinkBuilder<S> {
46 pub const fn new(
47 transformer: Transformer,
48 service: S,
49 default_api_key: Arc<str>,
50 batch_settings: BatcherSettings,
51 protocol: String,
52 conforms_as_agent: bool,
53 ) -> Self {
54 Self {
55 transformer,
56 service,
57 default_api_key,
58 batch_settings,
59 compression: None,
60 protocol,
61 conforms_as_agent,
62 }
63 }
64
65 pub const fn compression(mut self, compression: Compression) -> Self {
66 self.compression = Some(compression);
67 self
68 }
69
70 pub fn build(self) -> LogSink<S> {
71 LogSink {
72 default_api_key: self.default_api_key,
73 transformer: self.transformer,
74 service: self.service,
75 batch_settings: self.batch_settings,
76 compression: self.compression.unwrap_or_default(),
77 protocol: self.protocol,
78 conforms_as_agent: self.conforms_as_agent,
79 }
80 }
81}
82
83pub struct LogSink<S> {
84 default_api_key: Arc<str>,
91 service: S,
93 transformer: Transformer,
95 compression: Compression,
97 batch_settings: BatcherSettings,
99 protocol: String,
101 conforms_as_agent: bool,
103}
104
105pub fn normalize_event(event: &mut Event) {
109 let log = event.as_mut_log();
110
111 if !log.value().is_object() {
113 log.insert(MESSAGE, log.value().clone());
114 }
115
116 for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
119 if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
121 position_reserved_attr_event_root(log, ¤t_path, expected_field_name, meaning);
123 }
124 }
125
126 let ddtags_path = event_path!(DDTAGS);
130 if let Some(Value::Array(tags_arr)) = log.get(ddtags_path)
131 && !tags_arr.is_empty()
132 {
133 let all_tags: String = tags_arr
134 .iter()
135 .filter_map(|tag_kv| {
136 tag_kv
137 .as_bytes()
138 .map(|bytes| String::from_utf8_lossy(bytes))
139 })
140 .join(",");
141
142 log.insert(ddtags_path, all_tags);
143 }
144
145 let ts_path = event_path!("timestamp");
149 if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
150 log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
151 }
152}
153
154pub fn normalize_as_agent_event(event: &mut Event) {
161 let log = event.as_mut_log();
162 let Some(object_map) = log.as_map_mut() else {
164 return;
165 };
166 let mut local_root = ObjectMap::default();
168 let keys_to_move = object_map
169 .keys()
170 .filter(|ks| !is_reserved_attribute(ks.as_str()))
171 .cloned()
172 .collect::<Vec<_>>();
173 for key in keys_to_move {
174 if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
175 local_root.insert(entry_k, entry_v);
176 }
177 }
178 log.insert(MESSAGE, local_root);
180}
181
182pub fn position_reserved_attr_event_root(
185 log: &mut LogEvent,
186 current_path: &OwnedTargetPath,
187 expected_field_name: &str,
188 meaning: &'static str,
189) {
190 let desired_path = event_path!(expected_field_name);
192
193 if !path_is_field(current_path, expected_field_name) {
195 if log.contains(desired_path) {
198 let rename_attr = format!("_RESERVED_{meaning}");
199 let rename_path = event_path!(rename_attr.as_str());
200 emit!(DatadogLogsReservedAttributeConflict {
201 meaning,
202 source_path: current_path,
203 destination_path: expected_field_name,
204 renamed_existing_to: &rename_attr,
205 });
206 log.rename_key(desired_path, rename_path);
207 }
208
209 log.rename_key(current_path, desired_path);
210 }
211}
212
213pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
219 path.prefix == PathPrefix::Event
220 && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
221}
222
223#[derive(Debug, Snafu)]
224pub enum RequestBuildError {
225 #[snafu(display("Encoded payload is greater than the max limit."))]
226 PayloadTooBig { events_that_fit: usize },
227 #[snafu(display("Failed to build payload with error: {}", error))]
228 Io { error: std::io::Error },
229 #[snafu(display("Failed to serialize payload with error: {}", error))]
230 Json { error: serde_json::Error },
231}
232
233impl From<io::Error> for RequestBuildError {
234 fn from(error: io::Error) -> RequestBuildError {
235 RequestBuildError::Io { error }
236 }
237}
238
239impl From<serde_json::Error> for RequestBuildError {
240 fn from(error: serde_json::Error) -> RequestBuildError {
241 RequestBuildError::Json { error }
242 }
243}
244
245struct LogRequestBuilder {
246 pub default_api_key: Arc<str>,
247 pub transformer: Transformer,
248 pub compression: Compression,
249 pub conforms_as_agent: bool,
250}
251
252impl LogRequestBuilder {
253 pub fn build_request(
254 &self,
255 events: Vec<Event>,
256 api_key: Arc<str>,
257 ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
258 let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
260 .into_iter()
261 .map(|mut event| {
262 normalize_event(&mut event);
263 if self.conforms_as_agent {
264 normalize_as_agent_event(&mut event);
265 }
266 self.transformer.transform(&mut event);
267 let estimated_json_size = event.estimated_json_encoded_size_of();
268 (event, estimated_json_size)
269 })
270 .collect();
271
272 let mut requests: Vec<LogApiRequest> = Vec::new();
274 while !events_with_estimated_size.is_empty() {
275 let (events_serialized, body, byte_size) =
276 serialize_with_capacity(&mut events_with_estimated_size)?;
277 if events_serialized.is_empty() {
278 let _too_big = events_with_estimated_size.pop_front();
280 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
281 count: 1,
282 reason: "Event too large to encode."
283 });
284 } else {
285 let request =
286 self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
287 requests.push(request);
288 }
289 }
290
291 Ok(requests)
292 }
293
294 fn finish_request(
295 &self,
296 buf: Vec<u8>,
297 mut events: Vec<Event>,
298 byte_size: GroupedCountByteSize,
299 api_key: Arc<str>,
300 ) -> Result<LogApiRequest, RequestBuildError> {
301 let n_events = events.len();
302 let uncompressed_size = buf.len();
303
304 let mut compressor = Compressor::from(self.compression);
306 write_all(&mut compressor, n_events, &buf)?;
307 let bytes = compressor.into_inner().freeze();
308
309 let finalizers = events.take_finalizers();
310 let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
311
312 let payload = if self.compression.is_compressed() {
313 EncodeResult::compressed(bytes, uncompressed_size, byte_size)
314 } else {
315 EncodeResult::uncompressed(bytes, byte_size)
316 };
317
318 Ok::<_, RequestBuildError>(LogApiRequest {
319 api_key,
320 finalizers,
321 compression: self.compression,
322 metadata: request_metadata_builder.build(&payload),
323 uncompressed_size: payload.uncompressed_byte_size,
324 body: payload.into_payload(),
325 })
326 }
327}
328
329pub fn serialize_with_capacity(
335 events: &mut VecDeque<(Event, JsonSize)>,
336) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
337 let total_estimated =
339 events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
340
341 let mut buf = Vec::with_capacity(total_estimated);
343 let mut byte_size = telemetry().create_request_count_byte_size();
344 let mut events_serialized = Vec::with_capacity(events.len());
345
346 buf.push(b'[');
348 let mut first = true;
349 while let Some((event, estimated_json_size)) = events.pop_front() {
350 let existing_len = buf.len();
352 if first {
353 first = false;
354 } else {
355 buf.push(b',');
356 }
357 serde_json::to_writer(&mut buf, event.as_log())?;
358 if buf.len() >= MAX_PAYLOAD_BYTES {
360 events.push_front((event, estimated_json_size));
361 buf.truncate(existing_len);
362 break;
363 }
364 byte_size.add_event(&event, estimated_json_size);
366 events_serialized.push(event);
367 }
368 buf.push(b']');
369
370 Ok((events_serialized, buf, byte_size))
371}
372
373impl<S> LogSink<S>
374where
375 S: Service<LogApiRequest> + Send + 'static,
376 S::Future: Send + 'static,
377 S::Response: DriverResponse + Send + 'static,
378 S::Error: Debug + Into<crate::Error> + Send,
379{
380 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
381 let default_api_key = Arc::clone(&self.default_api_key);
382
383 let partitioner = EventPartitioner;
384 let batch_settings = self.batch_settings;
385 let builder = Arc::new(LogRequestBuilder {
386 default_api_key,
387 transformer: self.transformer,
388 compression: self.compression,
389 conforms_as_agent: self.conforms_as_agent,
390 });
391
392 let input = input.batched_partitioned(partitioner, batch_settings.timeout, |_| {
393 batch_settings.as_item_size_config(HttpJsonBatchSizer)
394 });
395 input
396 .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
397 let builder = Arc::clone(&builder);
398
399 Box::pin(
403 async move {
404 let (api_key, events) = input;
405 let api_key =
406 api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
407
408 builder.build_request(events, api_key)
409 }
410 .in_current_span(),
411 )
412 })
413 .filter_map(|request| async move {
414 match request {
415 Err(error) => {
416 emit!(SinkRequestBuildError { error });
417 None
418 }
419 Ok(reqs) => Some(futures::stream::iter(reqs)),
420 }
421 })
422 .flatten()
423 .into_driver(self.service)
424 .protocol(self.protocol)
425 .run()
426 .await
427 }
428}
429
430#[async_trait]
431impl<S> StreamSink<Event> for LogSink<S>
432where
433 S: Service<LogApiRequest> + Send + 'static,
434 S::Future: Send + 'static,
435 S::Response: DriverResponse + Send + 'static,
436 S::Error: Debug + Into<crate::Error> + Send,
437{
438 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
439 self.run_inner(input).await
440 }
441}
442
443#[cfg(test)]
444mod tests {
445
446 use std::sync::Arc;
447
448 use chrono::Utc;
449 use vector_lib::{
450 config::{LegacyKey, LogNamespace},
451 event::{Event, EventMetadata, LogEvent},
452 schema::{Definition, meaning},
453 };
454 use vrl::{
455 core::Value,
456 event_path, metadata_path, owned_value_path, value,
457 value::{Kind, kind::Collection},
458 };
459
460 use super::{normalize_as_agent_event, normalize_event};
461 use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
462
463 fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
464 assert!(
465 log.get(event_path!("timestamp"))
466 .expect("should have timestamp")
467 .is_integer()
468 );
469
470 for attr in [
471 "message",
472 "timestamp",
473 "hostname",
474 "ddtags",
475 "service",
476 "status",
477 ] {
478 assert!(log.contains(event_path!(attr)), "missing {attr}");
479 }
480
481 assert_eq!(
482 log.get(event_path!("ddtags")).expect("should have tags"),
483 &Value::Bytes("key1:value1,key2:value2".into())
484 );
485 }
486
487 fn agent_event_metadata(definition: Definition) -> EventMetadata {
488 EventMetadata::default().with_schema_definition(&Arc::new(
489 definition
490 .with_source_metadata(
491 "datadog_agent",
492 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
493 &owned_value_path!("ddtags"),
494 Kind::bytes(),
495 Some(meaning::TAGS),
496 )
497 .with_source_metadata(
498 "datadog_agent",
499 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
500 &owned_value_path!("hostname"),
501 Kind::bytes(),
502 Some(meaning::HOST),
503 )
504 .with_source_metadata(
505 "datadog_agent",
506 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
507 &owned_value_path!("timestamp"),
508 Kind::timestamp(),
509 Some(meaning::TIMESTAMP),
510 )
511 .with_source_metadata(
512 "datadog_agent",
513 Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
514 &owned_value_path!("severity"),
515 Kind::bytes(),
516 Some(meaning::SEVERITY),
517 )
518 .with_source_metadata(
519 "datadog_agent",
520 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
521 &owned_value_path!("service"),
522 Kind::bytes(),
523 Some(meaning::SERVICE),
524 )
525 .with_source_metadata(
526 "datadog_agent",
527 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
528 &owned_value_path!("source"),
529 Kind::bytes(),
530 Some(meaning::SOURCE),
531 ),
532 ))
533 }
534
535 #[test]
536 fn normalize_event_doesnt_require() {
537 let mut log = LogEvent::default();
538 log.insert(event_path!("foo"), "bar");
539
540 let mut event = Event::Log(log);
541 normalize_event(&mut event);
542
543 let log = event.as_log();
544
545 assert!(!log.contains(event_path!("message")));
546 assert!(!log.contains(event_path!("timestamp")));
547 assert!(!log.contains(event_path!("hostname")));
548 }
549
550 #[test]
551 fn normalize_event_normalizes_legacy_namespace() {
552 let definition = Definition::new_with_default_metadata(
553 Kind::object(Collection::empty()),
554 [LogNamespace::Legacy],
555 );
556 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
557 log.insert(event_path!("message"), "the_message");
558 let namespace = log.namespace();
559
560 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
561
562 let tags = vec![
563 Value::Bytes("key1:value1".into()),
564 Value::Bytes("key2:value2".into()),
565 ];
566
567 log.insert(event_path!("ddtags"), tags);
568 log.insert(event_path!("hostname"), "the_host");
569 log.insert(event_path!("service"), "the_service");
570 log.insert(event_path!("source"), "the_source");
571 log.insert(event_path!("severity"), "the_severity");
572
573 assert!(log.namespace() == LogNamespace::Legacy);
574
575 let mut event = Event::Log(log);
576 normalize_event(&mut event);
577
578 assert_normalized_log_has_expected_attrs(event.as_log());
579 }
580
581 #[test]
582 fn normalize_event_normalizes_vector_namespace_raw_field() {
583 let mut event = prepare_event_vector_namespace(|definition| {
584 LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
585 });
586
587 normalize_event(&mut event);
588 normalize_as_agent_event(&mut event);
589
590 assert_normalized_log_has_expected_attrs(event.as_log());
591 assert_only_reserved_fields_at_root(event.as_log());
592 assert_eq!(
593 event.as_log().get("message"),
594 Some(&value!({"message": "the_message"}))
595 );
596 }
597
598 fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
599 let definition =
600 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
601 let mut log = log_generator(definition);
602
603 log.insert(metadata_path!("vector", "foo"), "bar");
605
606 let namespace = log.namespace();
607 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
608
609 let tags = vec![
610 Value::Bytes("key1:value1".into()),
611 Value::Bytes("key2:value2".into()),
612 ];
613 log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
614
615 log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
616 log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
617 log.insert(metadata_path!("datadog_agent", "service"), "the_service");
618 log.insert(metadata_path!("datadog_agent", "source"), "the_source");
619 log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
620
621 assert!(log.namespace() == LogNamespace::Vector);
622 Event::Log(log)
623 }
624
625 #[test]
626 fn normalize_event_normalizes_vector_namespace() {
627 let mut event = prepare_event_vector_namespace(|definition| {
628 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
629 log.insert(event_path!("message"), "the_message");
630 log
631 });
632
633 normalize_event(&mut event);
634 normalize_as_agent_event(&mut event);
635
636 assert_normalized_log_has_expected_attrs(event.as_log());
637 assert_only_reserved_fields_at_root(event.as_log());
638 }
639
640 fn prepare_agent_event() -> LogEvent {
641 let definition = Definition::new_with_default_metadata(
642 Kind::object(Collection::empty()),
643 [LogNamespace::Legacy],
644 );
645 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
646 let namespace = log.namespace();
647 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
648
649 let tags = vec![
650 Value::Bytes("key1:value1".into()),
651 Value::Bytes("key2:value2".into()),
652 ];
653
654 log.insert(event_path!("ddtags"), tags);
656 log.insert(event_path!("hostname"), "the_host");
657 log.insert(event_path!("service"), "the_service");
658 log.insert(event_path!("timestamp"), Utc::now());
659 log.insert(event_path!("source"), "the_source");
660 log.insert(event_path!("severity"), "the_severity");
661
662 let sample_message = value!({
663 "message": "hello world",
664 "field_a": "field_a_value",
665 "field_b": "field_b_value",
666 "field_c": { "field_c_nested" : "field_c_value" },
667 });
668 log.insert(event_path!("message"), sample_message.to_string());
669 log
670 }
671
672 fn assert_only_reserved_fields_at_root(log: &LogEvent) {
673 let objmap = log.as_map().unwrap();
674 let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
675 .into_iter()
676 .chain([("message", "message")])
677 .collect::<Vec<(&str, &str)>>();
678 for key in objmap.keys() {
679 assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
680 }
681 }
682
683 #[test]
684 fn normalize_conforming_agent_with_collisions() {
685 let mut log = prepare_agent_event();
686
687 log.insert(event_path!("field_a"), "replaced_field_a_value");
689 log.insert(event_path!("field_c"), "replaced_field_c_value");
690 let mut event = Event::Log(log);
691 normalize_event(&mut event);
692 normalize_as_agent_event(&mut event);
693
694 let log = event.as_log();
695 assert_normalized_log_has_expected_attrs(log);
696 assert_only_reserved_fields_at_root(log);
697 assert_eq!(
698 log.get(event_path!("message")),
699 Some(&value!({
700 "source_type": "datadog_agent",
701 "field_a": "replaced_field_a_value",
702 "field_c": "replaced_field_c_value",
703 "message": (value!({
704 "message": "hello world",
705 "field_a": "field_a_value",
706 "field_b": "field_b_value",
707 "field_c": { "field_c_nested" : "field_c_value" },
708 }).to_string()),
709 }))
710 );
711 }
712
713 #[test]
714 fn normalize_conforming_agent() {
715 let mut log = prepare_agent_event();
716
717 log.insert(event_path!("field_1"), "value_1");
719 log.insert(event_path!("field_2"), "value_2");
720 log.insert(event_path!("field_3", "field_3_nested"), "value_3");
721
722 let mut event = Event::Log(log);
724 normalize_event(&mut event);
725 normalize_as_agent_event(&mut event);
726
727 let log = event.as_log();
729 assert_normalized_log_has_expected_attrs(log);
730 assert_only_reserved_fields_at_root(log);
731
732 assert_eq!(
734 log.get(event_path!("message")),
735 Some(&value!({
736 "source_type": "datadog_agent",
737 "message": (value!({
738 "message": "hello world",
739 "field_a": "field_a_value",
740 "field_b": "field_b_value",
741 "field_c": { "field_c_nested" : "field_c_value" },
742 }).to_string()),
743 "field_1": "value_1",
744 "field_2": "value_2",
745 "field_3": {
746 "field_3_nested": "value_3"
747 }
748 }))
749 );
750 }
751}