Skip to main content

vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf,
8    codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9    config::telemetry,
10    request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14
15pub trait Encoder<T> {
16    /// Encodes the input into the provided writer.
17    ///
18    /// # Errors
19    ///
20    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
21    fn encode_input(
22        &self,
23        input: T,
24        writer: &mut dyn io::Write,
25    ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
29    fn encode_input(
30        &self,
31        events: Vec<Event>,
32        writer: &mut dyn io::Write,
33    ) -> io::Result<(usize, GroupedCountByteSize)> {
34        let mut encoder = self.1.clone();
35        let mut bytes_written = 0;
36        let mut n_events_pending = events.len();
37        let is_empty = events.is_empty();
38        let batch_prefix = encoder.batch_prefix();
39        write_all(writer, n_events_pending, batch_prefix)?;
40        bytes_written += batch_prefix.len();
41
42        let mut byte_size = telemetry().create_request_count_byte_size();
43
44        for (position, mut event) in events.into_iter().with_position() {
45            self.0.transform(&mut event);
46
47            // Ensure the json size is calculated after any fields have been removed
48            // by the transformer.
49            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51            let mut bytes = BytesMut::new();
52            match (position, encoder.framer()) {
53                (
54                    Position::Last | Position::Only,
55                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56                ) => {
57                    encoder
58                        .serialize(event, &mut bytes)
59                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60                }
61                _ => {
62                    encoder
63                        .encode(event, &mut bytes)
64                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65                }
66            }
67            write_all(writer, n_events_pending, &bytes)?;
68            bytes_written += bytes.len();
69            n_events_pending -= 1;
70        }
71
72        let batch_suffix = encoder.batch_suffix(is_empty);
73        assert!(n_events_pending == 0);
74        write_all(writer, 0, batch_suffix)?;
75        bytes_written += batch_suffix.len();
76
77        Ok((bytes_written, byte_size))
78    }
79}
80
81impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
82    fn encode_input(
83        &self,
84        mut event: Event,
85        writer: &mut dyn io::Write,
86    ) -> io::Result<(usize, GroupedCountByteSize)> {
87        let mut encoder = self.1.clone();
88        self.0.transform(&mut event);
89
90        let mut byte_size = telemetry().create_request_count_byte_size();
91        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93        let mut bytes = BytesMut::new();
94        encoder
95            .serialize(event, &mut bytes)
96            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97        write_all(writer, 1, &bytes)?;
98        Ok((bytes.len(), byte_size))
99    }
100}
101
102#[cfg(feature = "codecs-arrow")]
103impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
104    fn encode_input(
105        &self,
106        events: Vec<Event>,
107        writer: &mut dyn io::Write,
108    ) -> io::Result<(usize, GroupedCountByteSize)> {
109        use tokio_util::codec::Encoder as _;
110        use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
111
112        let mut encoder = self.1.clone();
113        let mut byte_size = telemetry().create_request_count_byte_size();
114        let n_events = events.len();
115        let mut transformed_events = Vec::with_capacity(n_events);
116
117        for mut event in events {
118            self.0.transform(&mut event);
119            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
120            transformed_events.push(event);
121        }
122
123        let mut bytes = BytesMut::new();
124        encoder
125            .encode(transformed_events, &mut bytes)
126            .map_err(|error| {
127                // Codec error paths emit their own internal event
128                // (e.g. SchemaGenerationError, EncoderNullConstraintError,
129                // EncoderRecordBatchError) which logs the error and increments
130                // component_errors_total. We only emit the drop count here to
131                // avoid double-counting.
132                // n_events is the pre-filter count; Parquet filters non-log
133                // events before encoding, but that only happens if a sink is
134                // misconfigured to send non-log events into a log-only encoder,
135                // so the overcount is not a practical concern.
136                emit!(ComponentEventsDropped::<UNINTENTIONAL> {
137                    count: n_events,
138                    reason: "Failed to batch encode events.",
139                });
140                io::Error::new(io::ErrorKind::InvalidData, error)
141            })?;
142
143        write_all(writer, n_events, &bytes)?;
144        Ok((bytes.len(), byte_size))
145    }
146}
147
148impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
149    fn encode_input(
150        &self,
151        events: Vec<Event>,
152        writer: &mut dyn io::Write,
153    ) -> io::Result<(usize, GroupedCountByteSize)> {
154        // Delegate to the specific encoder implementation
155        match &self.1 {
156            vector_lib::codecs::EncoderKind::Framed(encoder) => {
157                (self.0.clone(), *encoder.clone()).encode_input(events, writer)
158            }
159            #[cfg(feature = "codecs-arrow")]
160            vector_lib::codecs::EncoderKind::Batch(encoder) => {
161                (self.0.clone(), encoder.clone()).encode_input(events, writer)
162            }
163        }
164    }
165}
166
167/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
168/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
169///
170/// # Arguments
171///
172/// * `writer`           - The object implementing io::Write to write data to.
173/// * `n_events_pending` - The number of events that are dropped if this write fails.
174/// * `buf`              - The buffer to write.
175pub fn write_all(
176    writer: &mut dyn io::Write,
177    n_events_pending: usize,
178    buf: &[u8],
179) -> io::Result<()> {
180    writer.write_all(buf).inspect_err(|error| {
181        emit!(EncoderWriteError {
182            error,
183            count: n_events_pending,
184        });
185    })
186}
187
188pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
189where
190    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
191    E: Into<io::Error> + 'static,
192{
193    struct Tracked<'inner> {
194        count: usize,
195        inner: &'inner mut dyn io::Write,
196    }
197
198    impl io::Write for Tracked<'_> {
199        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
200            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
201            let n = self.inner.write(buf)?;
202            self.count += n;
203            Ok(n)
204        }
205
206        fn flush(&mut self) -> io::Result<()> {
207            self.inner.flush()
208        }
209    }
210
211    let mut tracked = Tracked { count: 0, inner };
212    f(&mut tracked, input).map_err(|e| e.into())?;
213    Ok(tracked.count)
214}
215
216#[cfg(test)]
217mod tests {
218    use std::{collections::BTreeMap, env, path::PathBuf};
219
220    use bytes::{BufMut, Bytes};
221    use cfg_if::cfg_if;
222    use vector_lib::{
223        codecs::{
224            CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
225            NewlineDelimitedEncoder, TextSerializerConfig,
226            encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
227        },
228        event::LogEvent,
229        internal_event::CountByteSize,
230        json_size::JsonSize,
231    };
232    use vrl::value::{KeyString, Value};
233
234    cfg_if! {
235        if #[cfg(feature = "codecs-arrow")] {
236            use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
237            use vector_lib::codecs::{
238                BatchEncoder,
239                encoding::{ArrowStreamSerializer, ArrowStreamSerializerConfig, BatchSerializer},
240            };
241            use vector_lib::event_test_util::{clear_recorded_events, contains_name_once};
242        }
243    }
244
245    use super::*;
246
247    #[test]
248    fn test_encode_batch_json_empty() {
249        let encoding = (
250            Transformer::default(),
251            vector_lib::codecs::Encoder::<Framer>::new(
252                CharacterDelimitedEncoder::new(b',').into(),
253                JsonSerializerConfig::default().build().into(),
254            ),
255        );
256
257        let mut writer = Vec::new();
258        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
259        assert_eq!(written, 2);
260
261        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
262        assert_eq!(
263            CountByteSize(0, JsonSize::zero()),
264            json_size.size().unwrap()
265        );
266    }
267
268    #[test]
269    fn test_encode_batch_json_single() {
270        let encoding = (
271            Transformer::default(),
272            vector_lib::codecs::Encoder::<Framer>::new(
273                CharacterDelimitedEncoder::new(b',').into(),
274                JsonSerializerConfig::default().build().into(),
275            ),
276        );
277
278        let mut writer = Vec::new();
279        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
280            KeyString::from("key"),
281            Value::from("value"),
282        )])))];
283
284        let input_json_size = input
285            .iter()
286            .map(|event| event.estimated_json_encoded_size_of())
287            .sum::<JsonSize>();
288
289        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
290        assert_eq!(written, 17);
291
292        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
293        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
294    }
295
296    #[test]
297    fn test_encode_batch_json_multiple() {
298        let encoding = (
299            Transformer::default(),
300            vector_lib::codecs::Encoder::<Framer>::new(
301                CharacterDelimitedEncoder::new(b',').into(),
302                JsonSerializerConfig::default().build().into(),
303            ),
304        );
305
306        let input = vec![
307            Event::Log(LogEvent::from(BTreeMap::from([(
308                KeyString::from("key"),
309                Value::from("value1"),
310            )]))),
311            Event::Log(LogEvent::from(BTreeMap::from([(
312                KeyString::from("key"),
313                Value::from("value2"),
314            )]))),
315            Event::Log(LogEvent::from(BTreeMap::from([(
316                KeyString::from("key"),
317                Value::from("value3"),
318            )]))),
319        ];
320
321        let input_json_size = input
322            .iter()
323            .map(|event| event.estimated_json_encoded_size_of())
324            .sum::<JsonSize>();
325
326        let mut writer = Vec::new();
327        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
328        assert_eq!(written, 52);
329
330        assert_eq!(
331            String::from_utf8(writer).unwrap(),
332            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
333        );
334
335        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
336    }
337
338    #[test]
339    fn test_encode_batch_ndjson_empty() {
340        let encoding = (
341            Transformer::default(),
342            vector_lib::codecs::Encoder::<Framer>::new(
343                NewlineDelimitedEncoder::default().into(),
344                JsonSerializerConfig::default().build().into(),
345            ),
346        );
347
348        let mut writer = Vec::new();
349        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
350        assert_eq!(written, 0);
351
352        assert_eq!(String::from_utf8(writer).unwrap(), "");
353        assert_eq!(
354            CountByteSize(0, JsonSize::zero()),
355            json_size.size().unwrap()
356        );
357    }
358
359    #[test]
360    fn test_encode_batch_ndjson_single() {
361        let encoding = (
362            Transformer::default(),
363            vector_lib::codecs::Encoder::<Framer>::new(
364                NewlineDelimitedEncoder::default().into(),
365                JsonSerializerConfig::default().build().into(),
366            ),
367        );
368
369        let mut writer = Vec::new();
370        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
371            KeyString::from("key"),
372            Value::from("value"),
373        )])))];
374        let input_json_size = input
375            .iter()
376            .map(|event| event.estimated_json_encoded_size_of())
377            .sum::<JsonSize>();
378
379        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
380        assert_eq!(written, 16);
381
382        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
383        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
384    }
385
386    #[test]
387    fn test_encode_batch_ndjson_multiple() {
388        let encoding = (
389            Transformer::default(),
390            vector_lib::codecs::Encoder::<Framer>::new(
391                NewlineDelimitedEncoder::default().into(),
392                JsonSerializerConfig::default().build().into(),
393            ),
394        );
395
396        let mut writer = Vec::new();
397        let input = vec![
398            Event::Log(LogEvent::from(BTreeMap::from([(
399                KeyString::from("key"),
400                Value::from("value1"),
401            )]))),
402            Event::Log(LogEvent::from(BTreeMap::from([(
403                KeyString::from("key"),
404                Value::from("value2"),
405            )]))),
406            Event::Log(LogEvent::from(BTreeMap::from([(
407                KeyString::from("key"),
408                Value::from("value3"),
409            )]))),
410        ];
411        let input_json_size = input
412            .iter()
413            .map(|event| event.estimated_json_encoded_size_of())
414            .sum::<JsonSize>();
415
416        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
417        assert_eq!(written, 51);
418
419        assert_eq!(
420            String::from_utf8(writer).unwrap(),
421            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
422        );
423        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
424    }
425
426    #[test]
427    fn test_encode_event_json() {
428        let encoding = (
429            Transformer::default(),
430            vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
431        );
432
433        let mut writer = Vec::new();
434        let input = Event::Log(LogEvent::from(BTreeMap::from([(
435            KeyString::from("key"),
436            Value::from("value"),
437        )])));
438        let input_json_size = input.estimated_json_encoded_size_of();
439
440        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
441        assert_eq!(written, 15);
442
443        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
444        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
445    }
446
447    #[test]
448    fn test_encode_event_text() {
449        let encoding = (
450            Transformer::default(),
451            vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
452        );
453
454        let mut writer = Vec::new();
455        let input = Event::Log(LogEvent::from(BTreeMap::from([(
456            KeyString::from("message"),
457            Value::from("value"),
458        )])));
459        let input_json_size = input.estimated_json_encoded_size_of();
460
461        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
462        assert_eq!(written, 5);
463
464        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
465        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
466    }
467
468    fn test_data_dir() -> PathBuf {
469        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
470    }
471
472    #[test]
473    fn test_encode_batch_protobuf_single() {
474        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
475        let input_proto_size = message_raw.len();
476
477        // default LengthDelimitedCoderOptions.length_field_length is 4
478        let mut buf = BytesMut::with_capacity(64);
479        buf.reserve(4 + input_proto_size);
480        buf.put_uint(input_proto_size as u64, 4);
481        buf.extend_from_slice(&message_raw[..]);
482        let expected_bytes = buf.freeze();
483
484        let config = ProtobufSerializerConfig {
485            protobuf: ProtobufSerializerOptions {
486                desc_file: test_data_dir().join("test_proto.desc"),
487                message_type: "test_proto.User".to_string(),
488                use_json_names: false,
489            },
490        };
491
492        let encoding = (
493            Transformer::default(),
494            vector_lib::codecs::Encoder::<Framer>::new(
495                LengthDelimitedEncoder::default().into(),
496                config.build().unwrap().into(),
497            ),
498        );
499
500        let mut writer = Vec::new();
501        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
502            (KeyString::from("id"), Value::from("123")),
503            (KeyString::from("name"), Value::from("Alice")),
504            (KeyString::from("age"), Value::from(30)),
505            (
506                KeyString::from("emails"),
507                Value::from(vec!["alice@example.com", "alice@work.com"]),
508            ),
509        ])))];
510
511        let input_json_size = input
512            .iter()
513            .map(|event| event.estimated_json_encoded_size_of())
514            .sum::<JsonSize>();
515
516        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
517
518        assert_eq!(input_proto_size, 49);
519        assert_eq!(written, input_proto_size + 4);
520        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
521        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
522    }
523
524    #[test]
525    fn test_encode_batch_protobuf_multiple() {
526        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
527        let messages = vec![message_raw.clone(), message_raw.clone()];
528        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
529
530        let mut buf = BytesMut::with_capacity(128);
531        for message in messages {
532            // default LengthDelimitedCoderOptions.length_field_length is 4
533            buf.reserve(4 + message.len());
534            buf.put_uint(message.len() as u64, 4);
535            buf.extend_from_slice(&message[..]);
536        }
537        let expected_bytes = buf.freeze();
538
539        let config = ProtobufSerializerConfig {
540            protobuf: ProtobufSerializerOptions {
541                desc_file: test_data_dir().join("test_proto.desc"),
542                message_type: "test_proto.User".to_string(),
543                use_json_names: false,
544            },
545        };
546
547        let encoding = (
548            Transformer::default(),
549            vector_lib::codecs::Encoder::<Framer>::new(
550                LengthDelimitedEncoder::default().into(),
551                config.build().unwrap().into(),
552            ),
553        );
554
555        let mut writer = Vec::new();
556        let input = vec![
557            Event::Log(LogEvent::from(BTreeMap::from([
558                (KeyString::from("id"), Value::from("123")),
559                (KeyString::from("name"), Value::from("Alice")),
560                (KeyString::from("age"), Value::from(30)),
561                (
562                    KeyString::from("emails"),
563                    Value::from(vec!["alice@example.com", "alice@work.com"]),
564                ),
565            ]))),
566            Event::Log(LogEvent::from(BTreeMap::from([
567                (KeyString::from("id"), Value::from("123")),
568                (KeyString::from("name"), Value::from("Alice")),
569                (KeyString::from("age"), Value::from(30)),
570                (
571                    KeyString::from("emails"),
572                    Value::from(vec!["alice@example.com", "alice@work.com"]),
573                ),
574            ]))),
575        ];
576
577        let input_json_size: JsonSize = input
578            .iter()
579            .map(|event| event.estimated_json_encoded_size_of())
580            .sum();
581
582        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
583
584        assert_eq!(total_input_proto_size, 49 * 2);
585        assert_eq!(written, total_input_proto_size + 8);
586        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
587        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
588    }
589
590    #[cfg(feature = "codecs-arrow")]
591    #[test]
592    fn test_encode_batch_arrow_emits_record_batch_error_on_type_mismatch() {
593        clear_recorded_events();
594
595        // Schema declares `message` as Int64, but the event below carries a string,
596        // so `build_record_batch` returns `ArrowEncodingError::ArrowJsonDecode`.
597        let schema = ArrowSchema::new(vec![Field::new("message", DataType::Int64, false)]);
598        let serializer = ArrowStreamSerializer::new(ArrowStreamSerializerConfig::new(schema))
599            .expect("failed to build ArrowStreamSerializer");
600        let encoder = BatchEncoder::new(BatchSerializer::Arrow(serializer));
601        let encoding = (Transformer::default(), encoder);
602
603        let event = Event::Log(LogEvent::from(BTreeMap::from([(
604            KeyString::from("message"),
605            Value::from("not_an_integer"),
606        )])));
607
608        let mut writer = Vec::new();
609        let result = encoding.encode_input(vec![event], &mut writer);
610        assert!(
611            result.is_err(),
612            "type mismatch should fail batch encoding, got {result:?}"
613        );
614
615        contains_name_once("EncoderRecordBatchError")
616            .expect("EncoderRecordBatchError should be emitted on ArrowJsonDecode failure");
617        contains_name_once("ComponentEventsDropped")
618            .expect("ComponentEventsDropped should be emitted by the wrapper");
619    }
620}