Skip to main content

vector/sinks/datadog/logs/
sink.rs

1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use tracing::Instrument;
6use vector_lib::{
7    event::{ObjectMap, Value},
8    internal_event::{ComponentEventsDropped, UNINTENTIONAL},
9    lookup::event_path,
10};
11use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
12
13use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
14use crate::{
15    common::datadog::{DD_RESERVED_SEMANTIC_ATTRS, DDTAGS, MESSAGE, is_reserved_attribute},
16    internal_events::DatadogLogsReservedAttributeConflict,
17    sinks::{
18        prelude::*,
19        util::{Compressor, http::HttpJsonBatchSizer},
20    },
21};
22#[derive(Default)]
23pub struct EventPartitioner;
24
25impl Partitioner for EventPartitioner {
26    type Item = Event;
27    type Key = Option<Arc<str>>;
28
29    fn partition(&self, item: &Self::Item) -> Self::Key {
30        item.metadata().datadog_api_key()
31    }
32}
33
34#[derive(Debug)]
35pub struct LogSinkBuilder<S> {
36    transformer: Transformer,
37    service: S,
38    batch_settings: BatcherSettings,
39    compression: Option<Compression>,
40    default_api_key: Arc<str>,
41    protocol: String,
42    conforms_as_agent: bool,
43}
44
45impl<S> LogSinkBuilder<S> {
46    pub const fn new(
47        transformer: Transformer,
48        service: S,
49        default_api_key: Arc<str>,
50        batch_settings: BatcherSettings,
51        protocol: String,
52        conforms_as_agent: bool,
53    ) -> Self {
54        Self {
55            transformer,
56            service,
57            default_api_key,
58            batch_settings,
59            compression: None,
60            protocol,
61            conforms_as_agent,
62        }
63    }
64
65    pub const fn compression(mut self, compression: Compression) -> Self {
66        self.compression = Some(compression);
67        self
68    }
69
70    pub fn build(self) -> LogSink<S> {
71        LogSink {
72            default_api_key: self.default_api_key,
73            transformer: self.transformer,
74            service: self.service,
75            batch_settings: self.batch_settings,
76            compression: self.compression.unwrap_or_default(),
77            protocol: self.protocol,
78            conforms_as_agent: self.conforms_as_agent,
79        }
80    }
81}
82
83pub struct LogSink<S> {
84    /// The default Datadog API key to use
85    ///
86    /// In some instances an `Event` will come in on the stream with an
87    /// associated API key. That API key is the one it'll get batched up by but
88    /// otherwise we will see `Event` instances with no associated key. In that
89    /// case we batch them by this default.
90    default_api_key: Arc<str>,
91    /// The API service
92    service: S,
93    /// The encoding of payloads
94    transformer: Transformer,
95    /// The compression technique to use when building the request body
96    compression: Compression,
97    /// Batch settings: timeout, max events, max bytes, etc.
98    batch_settings: BatcherSettings,
99    /// The protocol name
100    protocol: String,
101    /// Normalize events to agent standard and attach associated HTTP header to request
102    conforms_as_agent: bool,
103}
104
105// The Datadog logs intake does not require the fields that are set in this
106// function. But if they are present in the event, we normalize the paths
107// (and value in the case of timestamp) to something that intake understands.
108pub fn normalize_event(event: &mut Event) {
109    let log = event.as_mut_log();
110
111    // Will cast the internal value to an object if it already isn't
112    if !log.value().is_object() {
113        log.insert(MESSAGE, log.value().clone());
114    }
115
116    // Upstream Sources may have semantically defined Datadog reserved attributes outside of their
117    // expected location by DD logs intake (root of the event). Move them if needed.
118    for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
119        // check if there is a semantic meaning for the reserved attribute
120        if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
121            // move it to the desired location
122            position_reserved_attr_event_root(log, &current_path, expected_field_name, meaning);
123        }
124    }
125
126    // if the tags value is an array we need to reconstruct it to a comma delimited string for DD logs intake.
127    // NOTE: we don't access by semantic meaning here because in the prior step
128    // we ensured reserved attributes are in expected locations.
129    let ddtags_path = event_path!(DDTAGS);
130    if let Some(Value::Array(tags_arr)) = log.get(ddtags_path)
131        && !tags_arr.is_empty()
132    {
133        let all_tags: String = tags_arr
134            .iter()
135            .filter_map(|tag_kv| {
136                tag_kv
137                    .as_bytes()
138                    .map(|bytes| String::from_utf8_lossy(bytes))
139            })
140            .join(",");
141
142        log.insert(ddtags_path, all_tags);
143    }
144
145    // ensure the timestamp is in expected format
146    // NOTE: we don't access by semantic meaning here because in the prior step
147    // we ensured reserved attributes are in expected locations.
148    let ts_path = event_path!("timestamp");
149    if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
150        log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
151    }
152}
153
154// Optionally for all other non-reserved fields, nest these under the `message` key. This is the
155// final step in having the event conform to the standard that the logs intake expects when an
156// event originates from an agent. Normalizing the events to the format prepared by the datadog
157// agent resolves any inconsistencies that would be observed when data flows through vector
158// before being ingested by the logs intake. This is because the logs intake interprets the
159// request with slight differences when this header and format are observed.
160pub fn normalize_as_agent_event(event: &mut Event) {
161    let log = event.as_mut_log();
162    // Should never occur since normalize_event forces a conversion of the log value to an Object type
163    let Some(object_map) = log.as_map_mut() else {
164        return;
165    };
166    // Move all non reserved fields into a new object
167    let mut local_root = ObjectMap::default();
168    let keys_to_move = object_map
169        .keys()
170        .filter(|ks| !is_reserved_attribute(ks.as_str()))
171        .cloned()
172        .collect::<Vec<_>>();
173    for key in keys_to_move {
174        if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
175            local_root.insert(entry_k, entry_v);
176        }
177    }
178    // .. nest this object at the root under the reserved key named 'message'
179    log.insert(MESSAGE, local_root);
180}
181
182// If an expected reserved attribute is not located in the event root, rename it and handle
183// any potential conflicts by preserving the conflicting one with a _RESERVED_ prefix.
184pub fn position_reserved_attr_event_root(
185    log: &mut LogEvent,
186    current_path: &OwnedTargetPath,
187    expected_field_name: &str,
188    meaning: &'static str,
189) {
190    // the path that DD archives expects this reserved attribute to be in.
191    let desired_path = event_path!(expected_field_name);
192
193    // if not already be at the expected location
194    if !path_is_field(current_path, expected_field_name) {
195        // if an existing attribute exists here already, move it so to not overwrite it.
196        // yes, technically the rename path could exist, but technically that could always be the case.
197        if log.contains(desired_path) {
198            let rename_attr = format!("_RESERVED_{meaning}");
199            let rename_path = event_path!(rename_attr.as_str());
200            emit!(DatadogLogsReservedAttributeConflict {
201                meaning,
202                source_path: current_path,
203                destination_path: expected_field_name,
204                renamed_existing_to: &rename_attr,
205            });
206            log.rename_key(desired_path, rename_path);
207        }
208
209        log.rename_key(current_path, desired_path);
210    }
211}
212
213// Test if the named path consists of the single named field. This is rather a hack and should
214// hypothetically be solvable in the `vrl` crate with an implementation of
215// `PartialEq<BorrowedTargetPath<'_>>`. The alternative is doing a comparison against another
216// `OwnedTargetPath`, but the naïve implementation of that requires multiple allocations and copies
217// just to test equality.
218pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
219    path.prefix == PathPrefix::Event
220        && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
221}
222
223#[derive(Debug, Snafu)]
224pub enum RequestBuildError {
225    #[snafu(display("Encoded payload is greater than the max limit."))]
226    PayloadTooBig { events_that_fit: usize },
227    #[snafu(display("Failed to build payload with error: {}", error))]
228    Io { error: std::io::Error },
229    #[snafu(display("Failed to serialize payload with error: {}", error))]
230    Json { error: serde_json::Error },
231}
232
233impl From<io::Error> for RequestBuildError {
234    fn from(error: io::Error) -> RequestBuildError {
235        RequestBuildError::Io { error }
236    }
237}
238
239impl From<serde_json::Error> for RequestBuildError {
240    fn from(error: serde_json::Error) -> RequestBuildError {
241        RequestBuildError::Json { error }
242    }
243}
244
245struct LogRequestBuilder {
246    pub default_api_key: Arc<str>,
247    pub transformer: Transformer,
248    pub compression: Compression,
249    pub conforms_as_agent: bool,
250}
251
252impl LogRequestBuilder {
253    pub fn build_request(
254        &self,
255        events: Vec<Event>,
256        api_key: Arc<str>,
257    ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
258        // Transform events and pre-compute their estimated size.
259        let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
260            .into_iter()
261            .map(|mut event| {
262                normalize_event(&mut event);
263                if self.conforms_as_agent {
264                    normalize_as_agent_event(&mut event);
265                }
266                self.transformer.transform(&mut event);
267                let estimated_json_size = event.estimated_json_encoded_size_of();
268                (event, estimated_json_size)
269            })
270            .collect();
271
272        // Construct requests respecting the max payload size.
273        let mut requests: Vec<LogApiRequest> = Vec::new();
274        while !events_with_estimated_size.is_empty() {
275            let (events_serialized, body, byte_size) =
276                serialize_with_capacity(&mut events_with_estimated_size)?;
277            if events_serialized.is_empty() {
278                // first event was too large for whole request
279                let _too_big = events_with_estimated_size.pop_front();
280                emit!(ComponentEventsDropped::<UNINTENTIONAL> {
281                    count: 1,
282                    reason: "Event too large to encode."
283                });
284            } else {
285                let request =
286                    self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
287                requests.push(request);
288            }
289        }
290
291        Ok(requests)
292    }
293
294    fn finish_request(
295        &self,
296        buf: Vec<u8>,
297        mut events: Vec<Event>,
298        byte_size: GroupedCountByteSize,
299        api_key: Arc<str>,
300    ) -> Result<LogApiRequest, RequestBuildError> {
301        let n_events = events.len();
302        let uncompressed_size = buf.len();
303
304        // Now just compress it like normal.
305        let mut compressor = Compressor::from(self.compression);
306        write_all(&mut compressor, n_events, &buf)?;
307        let bytes = compressor.into_inner().freeze();
308
309        let finalizers = events.take_finalizers();
310        let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
311
312        let payload = if self.compression.is_compressed() {
313            EncodeResult::compressed(bytes, uncompressed_size, byte_size)
314        } else {
315            EncodeResult::uncompressed(bytes, byte_size)
316        };
317
318        Ok::<_, RequestBuildError>(LogApiRequest {
319            api_key,
320            finalizers,
321            compression: self.compression,
322            metadata: request_metadata_builder.build(&payload),
323            uncompressed_size: payload.uncompressed_byte_size,
324            body: payload.into_payload(),
325        })
326    }
327}
328
329/// Serialize events into a buffer as a JSON array that has a maximum size of
330/// `MAX_PAYLOAD_BYTES`.
331///
332/// Returns the serialized events, the buffer, and the byte size of the events.
333/// Events that are not serialized remain in the `events` parameter.
334pub fn serialize_with_capacity(
335    events: &mut VecDeque<(Event, JsonSize)>,
336) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
337    // Compute estimated size, accounting for the size of the brackets and commas.
338    let total_estimated =
339        events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
340
341    // Initialize state.
342    let mut buf = Vec::with_capacity(total_estimated);
343    let mut byte_size = telemetry().create_request_count_byte_size();
344    let mut events_serialized = Vec::with_capacity(events.len());
345
346    // Write entries until the buffer is full.
347    buf.push(b'[');
348    let mut first = true;
349    while let Some((event, estimated_json_size)) = events.pop_front() {
350        // Track the existing length of the buffer so we can truncate it if we need to.
351        let existing_len = buf.len();
352        if first {
353            first = false;
354        } else {
355            buf.push(b',');
356        }
357        serde_json::to_writer(&mut buf, event.as_log())?;
358        // If the buffer is too big, truncate it and break out of the loop.
359        if buf.len() >= MAX_PAYLOAD_BYTES {
360            events.push_front((event, estimated_json_size));
361            buf.truncate(existing_len);
362            break;
363        }
364        // Otherwise, track the size of the event and continue.
365        byte_size.add_event(&event, estimated_json_size);
366        events_serialized.push(event);
367    }
368    buf.push(b']');
369
370    Ok((events_serialized, buf, byte_size))
371}
372
373impl<S> LogSink<S>
374where
375    S: Service<LogApiRequest> + Send + 'static,
376    S::Future: Send + 'static,
377    S::Response: DriverResponse + Send + 'static,
378    S::Error: Debug + Into<crate::Error> + Send,
379{
380    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
381        let default_api_key = Arc::clone(&self.default_api_key);
382
383        let partitioner = EventPartitioner;
384        let batch_settings = self.batch_settings;
385        let builder = Arc::new(LogRequestBuilder {
386            default_api_key,
387            transformer: self.transformer,
388            compression: self.compression,
389            conforms_as_agent: self.conforms_as_agent,
390        });
391
392        let input = input.batched_partitioned(partitioner, batch_settings.timeout, |_| {
393            batch_settings.as_item_size_config(HttpJsonBatchSizer)
394        });
395        input
396            .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
397                let builder = Arc::clone(&builder);
398
399                // `concurrent_map` spawns this future on a detached task. The closure itself runs
400                // within `run_inner`'s span, so `in_current_span` captures the sink span here and
401                // re-enters it on the spawned task to preserve the sink's automatic component tags.
402                Box::pin(
403                    async move {
404                        let (api_key, events) = input;
405                        let api_key =
406                            api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
407
408                        builder.build_request(events, api_key)
409                    }
410                    .in_current_span(),
411                )
412            })
413            .filter_map(|request| async move {
414                match request {
415                    Err(error) => {
416                        emit!(SinkRequestBuildError { error });
417                        None
418                    }
419                    Ok(reqs) => Some(futures::stream::iter(reqs)),
420                }
421            })
422            .flatten()
423            .into_driver(self.service)
424            .protocol(self.protocol)
425            .run()
426            .await
427    }
428}
429
430#[async_trait]
431impl<S> StreamSink<Event> for LogSink<S>
432where
433    S: Service<LogApiRequest> + Send + 'static,
434    S::Future: Send + 'static,
435    S::Response: DriverResponse + Send + 'static,
436    S::Error: Debug + Into<crate::Error> + Send,
437{
438    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
439        self.run_inner(input).await
440    }
441}
442
443#[cfg(test)]
444mod tests {
445
446    use std::sync::Arc;
447
448    use chrono::Utc;
449    use vector_lib::{
450        config::{LegacyKey, LogNamespace},
451        event::{Event, EventMetadata, LogEvent},
452        schema::{Definition, meaning},
453    };
454    use vrl::{
455        core::Value,
456        event_path, metadata_path, owned_value_path, value,
457        value::{Kind, kind::Collection},
458    };
459
460    use super::{normalize_as_agent_event, normalize_event};
461    use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
462
463    fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
464        assert!(
465            log.get(event_path!("timestamp"))
466                .expect("should have timestamp")
467                .is_integer()
468        );
469
470        for attr in [
471            "message",
472            "timestamp",
473            "hostname",
474            "ddtags",
475            "service",
476            "status",
477        ] {
478            assert!(log.contains(event_path!(attr)), "missing {attr}");
479        }
480
481        assert_eq!(
482            log.get(event_path!("ddtags")).expect("should have tags"),
483            &Value::Bytes("key1:value1,key2:value2".into())
484        );
485    }
486
487    fn agent_event_metadata(definition: Definition) -> EventMetadata {
488        EventMetadata::default().with_schema_definition(&Arc::new(
489            definition
490                .with_source_metadata(
491                    "datadog_agent",
492                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
493                    &owned_value_path!("ddtags"),
494                    Kind::bytes(),
495                    Some(meaning::TAGS),
496                )
497                .with_source_metadata(
498                    "datadog_agent",
499                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
500                    &owned_value_path!("hostname"),
501                    Kind::bytes(),
502                    Some(meaning::HOST),
503                )
504                .with_source_metadata(
505                    "datadog_agent",
506                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
507                    &owned_value_path!("timestamp"),
508                    Kind::timestamp(),
509                    Some(meaning::TIMESTAMP),
510                )
511                .with_source_metadata(
512                    "datadog_agent",
513                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
514                    &owned_value_path!("severity"),
515                    Kind::bytes(),
516                    Some(meaning::SEVERITY),
517                )
518                .with_source_metadata(
519                    "datadog_agent",
520                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
521                    &owned_value_path!("service"),
522                    Kind::bytes(),
523                    Some(meaning::SERVICE),
524                )
525                .with_source_metadata(
526                    "datadog_agent",
527                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
528                    &owned_value_path!("source"),
529                    Kind::bytes(),
530                    Some(meaning::SOURCE),
531                ),
532        ))
533    }
534
535    #[test]
536    fn normalize_event_doesnt_require() {
537        let mut log = LogEvent::default();
538        log.insert(event_path!("foo"), "bar");
539
540        let mut event = Event::Log(log);
541        normalize_event(&mut event);
542
543        let log = event.as_log();
544
545        assert!(!log.contains(event_path!("message")));
546        assert!(!log.contains(event_path!("timestamp")));
547        assert!(!log.contains(event_path!("hostname")));
548    }
549
550    #[test]
551    fn normalize_event_normalizes_legacy_namespace() {
552        let definition = Definition::new_with_default_metadata(
553            Kind::object(Collection::empty()),
554            [LogNamespace::Legacy],
555        );
556        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
557        log.insert(event_path!("message"), "the_message");
558        let namespace = log.namespace();
559
560        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
561
562        let tags = vec![
563            Value::Bytes("key1:value1".into()),
564            Value::Bytes("key2:value2".into()),
565        ];
566
567        log.insert(event_path!("ddtags"), tags);
568        log.insert(event_path!("hostname"), "the_host");
569        log.insert(event_path!("service"), "the_service");
570        log.insert(event_path!("source"), "the_source");
571        log.insert(event_path!("severity"), "the_severity");
572
573        assert!(log.namespace() == LogNamespace::Legacy);
574
575        let mut event = Event::Log(log);
576        normalize_event(&mut event);
577
578        assert_normalized_log_has_expected_attrs(event.as_log());
579    }
580
581    #[test]
582    fn normalize_event_normalizes_vector_namespace_raw_field() {
583        let mut event = prepare_event_vector_namespace(|definition| {
584            LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
585        });
586
587        normalize_event(&mut event);
588        normalize_as_agent_event(&mut event);
589
590        assert_normalized_log_has_expected_attrs(event.as_log());
591        assert_only_reserved_fields_at_root(event.as_log());
592        assert_eq!(
593            event.as_log().get("message"),
594            Some(&value!({"message": "the_message"}))
595        );
596    }
597
598    fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
599        let definition =
600            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
601        let mut log = log_generator(definition);
602
603        // insert an arbitrary metadata field such that the log becomes Vector namespaced
604        log.insert(metadata_path!("vector", "foo"), "bar");
605
606        let namespace = log.namespace();
607        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
608
609        let tags = vec![
610            Value::Bytes("key1:value1".into()),
611            Value::Bytes("key2:value2".into()),
612        ];
613        log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
614
615        log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
616        log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
617        log.insert(metadata_path!("datadog_agent", "service"), "the_service");
618        log.insert(metadata_path!("datadog_agent", "source"), "the_source");
619        log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
620
621        assert!(log.namespace() == LogNamespace::Vector);
622        Event::Log(log)
623    }
624
625    #[test]
626    fn normalize_event_normalizes_vector_namespace() {
627        let mut event = prepare_event_vector_namespace(|definition| {
628            let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
629            log.insert(event_path!("message"), "the_message");
630            log
631        });
632
633        normalize_event(&mut event);
634        normalize_as_agent_event(&mut event);
635
636        assert_normalized_log_has_expected_attrs(event.as_log());
637        assert_only_reserved_fields_at_root(event.as_log());
638    }
639
640    fn prepare_agent_event() -> LogEvent {
641        let definition = Definition::new_with_default_metadata(
642            Kind::object(Collection::empty()),
643            [LogNamespace::Legacy],
644        );
645        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
646        let namespace = log.namespace();
647        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
648
649        let tags = vec![
650            Value::Bytes("key1:value1".into()),
651            Value::Bytes("key2:value2".into()),
652        ];
653
654        // insert mandatory fields
655        log.insert(event_path!("ddtags"), tags);
656        log.insert(event_path!("hostname"), "the_host");
657        log.insert(event_path!("service"), "the_service");
658        log.insert(event_path!("timestamp"), Utc::now());
659        log.insert(event_path!("source"), "the_source");
660        log.insert(event_path!("severity"), "the_severity");
661
662        let sample_message = value!({
663            "message": "hello world",
664            "field_a": "field_a_value",
665            "field_b": "field_b_value",
666            "field_c": { "field_c_nested" : "field_c_value" },
667        });
668        log.insert(event_path!("message"), sample_message.to_string());
669        log
670    }
671
672    fn assert_only_reserved_fields_at_root(log: &LogEvent) {
673        let objmap = log.as_map().unwrap();
674        let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
675            .into_iter()
676            .chain([("message", "message")])
677            .collect::<Vec<(&str, &str)>>();
678        for key in objmap.keys() {
679            assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
680        }
681    }
682
683    #[test]
684    fn normalize_conforming_agent_with_collisions() {
685        let mut log = prepare_agent_event();
686
687        // insert random fields at root which will collide with sample data at 'message'
688        log.insert(event_path!("field_a"), "replaced_field_a_value");
689        log.insert(event_path!("field_c"), "replaced_field_c_value");
690        let mut event = Event::Log(log);
691        normalize_event(&mut event);
692        normalize_as_agent_event(&mut event);
693
694        let log = event.as_log();
695        assert_normalized_log_has_expected_attrs(log);
696        assert_only_reserved_fields_at_root(log);
697        assert_eq!(
698            log.get(event_path!("message")),
699            Some(&value!({
700                "source_type": "datadog_agent",
701                "field_a": "replaced_field_a_value",
702                "field_c": "replaced_field_c_value",
703                "message": (value!({
704                    "message": "hello world",
705                    "field_a": "field_a_value",
706                    "field_b": "field_b_value",
707                    "field_c": { "field_c_nested" : "field_c_value" },
708                }).to_string()),
709            }))
710        );
711    }
712
713    #[test]
714    fn normalize_conforming_agent() {
715        let mut log = prepare_agent_event();
716
717        // insert random fields at root
718        log.insert(event_path!("field_1"), "value_1");
719        log.insert(event_path!("field_2"), "value_2");
720        log.insert(event_path!("field_3", "field_3_nested"), "value_3");
721
722        // normalize and validate...
723        let mut event = Event::Log(log);
724        normalize_event(&mut event);
725        normalize_as_agent_event(&mut event);
726
727        // that all fields placed at the root no longer exist there
728        let log = event.as_log();
729        assert_normalized_log_has_expected_attrs(log);
730        assert_only_reserved_fields_at_root(log);
731
732        // .. and that they were nested properly underneath message
733        assert_eq!(
734            log.get(event_path!("message")),
735            Some(&value!({
736                "source_type": "datadog_agent",
737                "message": (value!({
738                    "message": "hello world",
739                    "field_a": "field_a_value",
740                    "field_b": "field_b_value",
741                    "field_c": { "field_c_nested" : "field_c_value" },
742                }).to_string()),
743                "field_1": "value_1",
744                "field_2": "value_2",
745                "field_3": {
746                    "field_3_nested": "value_3"
747                }
748            }))
749        );
750    }
751}