1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf,
8 codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9 config::telemetry,
10 request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14
15pub trait Encoder<T> {
16 fn encode_input(
22 &self,
23 input: T,
24 writer: &mut dyn io::Write,
25 ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
29 fn encode_input(
30 &self,
31 events: Vec<Event>,
32 writer: &mut dyn io::Write,
33 ) -> io::Result<(usize, GroupedCountByteSize)> {
34 let mut encoder = self.1.clone();
35 let mut bytes_written = 0;
36 let mut n_events_pending = events.len();
37 let is_empty = events.is_empty();
38 let batch_prefix = encoder.batch_prefix();
39 write_all(writer, n_events_pending, batch_prefix)?;
40 bytes_written += batch_prefix.len();
41
42 let mut byte_size = telemetry().create_request_count_byte_size();
43
44 for (position, mut event) in events.into_iter().with_position() {
45 self.0.transform(&mut event);
46
47 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51 let mut bytes = BytesMut::new();
52 match (position, encoder.framer()) {
53 (
54 Position::Last | Position::Only,
55 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56 ) => {
57 encoder
58 .serialize(event, &mut bytes)
59 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60 }
61 _ => {
62 encoder
63 .encode(event, &mut bytes)
64 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65 }
66 }
67 write_all(writer, n_events_pending, &bytes)?;
68 bytes_written += bytes.len();
69 n_events_pending -= 1;
70 }
71
72 let batch_suffix = encoder.batch_suffix(is_empty);
73 assert!(n_events_pending == 0);
74 write_all(writer, 0, batch_suffix)?;
75 bytes_written += batch_suffix.len();
76
77 Ok((bytes_written, byte_size))
78 }
79}
80
81impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
82 fn encode_input(
83 &self,
84 mut event: Event,
85 writer: &mut dyn io::Write,
86 ) -> io::Result<(usize, GroupedCountByteSize)> {
87 let mut encoder = self.1.clone();
88 self.0.transform(&mut event);
89
90 let mut byte_size = telemetry().create_request_count_byte_size();
91 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93 let mut bytes = BytesMut::new();
94 encoder
95 .serialize(event, &mut bytes)
96 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97 write_all(writer, 1, &bytes)?;
98 Ok((bytes.len(), byte_size))
99 }
100}
101
102#[cfg(feature = "codecs-arrow")]
103impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
104 fn encode_input(
105 &self,
106 events: Vec<Event>,
107 writer: &mut dyn io::Write,
108 ) -> io::Result<(usize, GroupedCountByteSize)> {
109 use tokio_util::codec::Encoder as _;
110 use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
111
112 let mut encoder = self.1.clone();
113 let mut byte_size = telemetry().create_request_count_byte_size();
114 let n_events = events.len();
115 let mut transformed_events = Vec::with_capacity(n_events);
116
117 for mut event in events {
118 self.0.transform(&mut event);
119 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
120 transformed_events.push(event);
121 }
122
123 let mut bytes = BytesMut::new();
124 encoder
125 .encode(transformed_events, &mut bytes)
126 .map_err(|error| {
127 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
137 count: n_events,
138 reason: "Failed to batch encode events.",
139 });
140 io::Error::new(io::ErrorKind::InvalidData, error)
141 })?;
142
143 write_all(writer, n_events, &bytes)?;
144 Ok((bytes.len(), byte_size))
145 }
146}
147
148impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
149 fn encode_input(
150 &self,
151 events: Vec<Event>,
152 writer: &mut dyn io::Write,
153 ) -> io::Result<(usize, GroupedCountByteSize)> {
154 match &self.1 {
156 vector_lib::codecs::EncoderKind::Framed(encoder) => {
157 (self.0.clone(), *encoder.clone()).encode_input(events, writer)
158 }
159 #[cfg(feature = "codecs-arrow")]
160 vector_lib::codecs::EncoderKind::Batch(encoder) => {
161 (self.0.clone(), encoder.clone()).encode_input(events, writer)
162 }
163 }
164 }
165}
166
167pub fn write_all(
176 writer: &mut dyn io::Write,
177 n_events_pending: usize,
178 buf: &[u8],
179) -> io::Result<()> {
180 writer.write_all(buf).inspect_err(|error| {
181 emit!(EncoderWriteError {
182 error,
183 count: n_events_pending,
184 });
185 })
186}
187
188pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
189where
190 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
191 E: Into<io::Error> + 'static,
192{
193 struct Tracked<'inner> {
194 count: usize,
195 inner: &'inner mut dyn io::Write,
196 }
197
198 impl io::Write for Tracked<'_> {
199 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
200 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
202 self.count += n;
203 Ok(n)
204 }
205
206 fn flush(&mut self) -> io::Result<()> {
207 self.inner.flush()
208 }
209 }
210
211 let mut tracked = Tracked { count: 0, inner };
212 f(&mut tracked, input).map_err(|e| e.into())?;
213 Ok(tracked.count)
214}
215
216#[cfg(test)]
217mod tests {
218 use std::{collections::BTreeMap, env, path::PathBuf};
219
220 use bytes::{BufMut, Bytes};
221 use cfg_if::cfg_if;
222 use vector_lib::{
223 codecs::{
224 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
225 NewlineDelimitedEncoder, TextSerializerConfig,
226 encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
227 },
228 event::LogEvent,
229 internal_event::CountByteSize,
230 json_size::JsonSize,
231 };
232 use vrl::value::{KeyString, Value};
233
234 cfg_if! {
235 if #[cfg(feature = "codecs-arrow")] {
236 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
237 use vector_lib::codecs::{
238 BatchEncoder,
239 encoding::{ArrowStreamSerializer, ArrowStreamSerializerConfig, BatchSerializer},
240 };
241 use vector_lib::event_test_util::{clear_recorded_events, contains_name_once};
242 }
243 }
244
245 use super::*;
246
247 #[test]
248 fn test_encode_batch_json_empty() {
249 let encoding = (
250 Transformer::default(),
251 vector_lib::codecs::Encoder::<Framer>::new(
252 CharacterDelimitedEncoder::new(b',').into(),
253 JsonSerializerConfig::default().build().into(),
254 ),
255 );
256
257 let mut writer = Vec::new();
258 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
259 assert_eq!(written, 2);
260
261 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
262 assert_eq!(
263 CountByteSize(0, JsonSize::zero()),
264 json_size.size().unwrap()
265 );
266 }
267
268 #[test]
269 fn test_encode_batch_json_single() {
270 let encoding = (
271 Transformer::default(),
272 vector_lib::codecs::Encoder::<Framer>::new(
273 CharacterDelimitedEncoder::new(b',').into(),
274 JsonSerializerConfig::default().build().into(),
275 ),
276 );
277
278 let mut writer = Vec::new();
279 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
280 KeyString::from("key"),
281 Value::from("value"),
282 )])))];
283
284 let input_json_size = input
285 .iter()
286 .map(|event| event.estimated_json_encoded_size_of())
287 .sum::<JsonSize>();
288
289 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
290 assert_eq!(written, 17);
291
292 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
293 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
294 }
295
296 #[test]
297 fn test_encode_batch_json_multiple() {
298 let encoding = (
299 Transformer::default(),
300 vector_lib::codecs::Encoder::<Framer>::new(
301 CharacterDelimitedEncoder::new(b',').into(),
302 JsonSerializerConfig::default().build().into(),
303 ),
304 );
305
306 let input = vec![
307 Event::Log(LogEvent::from(BTreeMap::from([(
308 KeyString::from("key"),
309 Value::from("value1"),
310 )]))),
311 Event::Log(LogEvent::from(BTreeMap::from([(
312 KeyString::from("key"),
313 Value::from("value2"),
314 )]))),
315 Event::Log(LogEvent::from(BTreeMap::from([(
316 KeyString::from("key"),
317 Value::from("value3"),
318 )]))),
319 ];
320
321 let input_json_size = input
322 .iter()
323 .map(|event| event.estimated_json_encoded_size_of())
324 .sum::<JsonSize>();
325
326 let mut writer = Vec::new();
327 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
328 assert_eq!(written, 52);
329
330 assert_eq!(
331 String::from_utf8(writer).unwrap(),
332 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
333 );
334
335 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
336 }
337
338 #[test]
339 fn test_encode_batch_ndjson_empty() {
340 let encoding = (
341 Transformer::default(),
342 vector_lib::codecs::Encoder::<Framer>::new(
343 NewlineDelimitedEncoder::default().into(),
344 JsonSerializerConfig::default().build().into(),
345 ),
346 );
347
348 let mut writer = Vec::new();
349 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
350 assert_eq!(written, 0);
351
352 assert_eq!(String::from_utf8(writer).unwrap(), "");
353 assert_eq!(
354 CountByteSize(0, JsonSize::zero()),
355 json_size.size().unwrap()
356 );
357 }
358
359 #[test]
360 fn test_encode_batch_ndjson_single() {
361 let encoding = (
362 Transformer::default(),
363 vector_lib::codecs::Encoder::<Framer>::new(
364 NewlineDelimitedEncoder::default().into(),
365 JsonSerializerConfig::default().build().into(),
366 ),
367 );
368
369 let mut writer = Vec::new();
370 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
371 KeyString::from("key"),
372 Value::from("value"),
373 )])))];
374 let input_json_size = input
375 .iter()
376 .map(|event| event.estimated_json_encoded_size_of())
377 .sum::<JsonSize>();
378
379 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
380 assert_eq!(written, 16);
381
382 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
383 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
384 }
385
386 #[test]
387 fn test_encode_batch_ndjson_multiple() {
388 let encoding = (
389 Transformer::default(),
390 vector_lib::codecs::Encoder::<Framer>::new(
391 NewlineDelimitedEncoder::default().into(),
392 JsonSerializerConfig::default().build().into(),
393 ),
394 );
395
396 let mut writer = Vec::new();
397 let input = vec![
398 Event::Log(LogEvent::from(BTreeMap::from([(
399 KeyString::from("key"),
400 Value::from("value1"),
401 )]))),
402 Event::Log(LogEvent::from(BTreeMap::from([(
403 KeyString::from("key"),
404 Value::from("value2"),
405 )]))),
406 Event::Log(LogEvent::from(BTreeMap::from([(
407 KeyString::from("key"),
408 Value::from("value3"),
409 )]))),
410 ];
411 let input_json_size = input
412 .iter()
413 .map(|event| event.estimated_json_encoded_size_of())
414 .sum::<JsonSize>();
415
416 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
417 assert_eq!(written, 51);
418
419 assert_eq!(
420 String::from_utf8(writer).unwrap(),
421 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
422 );
423 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
424 }
425
426 #[test]
427 fn test_encode_event_json() {
428 let encoding = (
429 Transformer::default(),
430 vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
431 );
432
433 let mut writer = Vec::new();
434 let input = Event::Log(LogEvent::from(BTreeMap::from([(
435 KeyString::from("key"),
436 Value::from("value"),
437 )])));
438 let input_json_size = input.estimated_json_encoded_size_of();
439
440 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
441 assert_eq!(written, 15);
442
443 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
444 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
445 }
446
447 #[test]
448 fn test_encode_event_text() {
449 let encoding = (
450 Transformer::default(),
451 vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
452 );
453
454 let mut writer = Vec::new();
455 let input = Event::Log(LogEvent::from(BTreeMap::from([(
456 KeyString::from("message"),
457 Value::from("value"),
458 )])));
459 let input_json_size = input.estimated_json_encoded_size_of();
460
461 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
462 assert_eq!(written, 5);
463
464 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
465 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
466 }
467
468 fn test_data_dir() -> PathBuf {
469 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
470 }
471
472 #[test]
473 fn test_encode_batch_protobuf_single() {
474 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
475 let input_proto_size = message_raw.len();
476
477 let mut buf = BytesMut::with_capacity(64);
479 buf.reserve(4 + input_proto_size);
480 buf.put_uint(input_proto_size as u64, 4);
481 buf.extend_from_slice(&message_raw[..]);
482 let expected_bytes = buf.freeze();
483
484 let config = ProtobufSerializerConfig {
485 protobuf: ProtobufSerializerOptions {
486 desc_file: test_data_dir().join("test_proto.desc"),
487 message_type: "test_proto.User".to_string(),
488 use_json_names: false,
489 },
490 };
491
492 let encoding = (
493 Transformer::default(),
494 vector_lib::codecs::Encoder::<Framer>::new(
495 LengthDelimitedEncoder::default().into(),
496 config.build().unwrap().into(),
497 ),
498 );
499
500 let mut writer = Vec::new();
501 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
502 (KeyString::from("id"), Value::from("123")),
503 (KeyString::from("name"), Value::from("Alice")),
504 (KeyString::from("age"), Value::from(30)),
505 (
506 KeyString::from("emails"),
507 Value::from(vec!["alice@example.com", "alice@work.com"]),
508 ),
509 ])))];
510
511 let input_json_size = input
512 .iter()
513 .map(|event| event.estimated_json_encoded_size_of())
514 .sum::<JsonSize>();
515
516 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
517
518 assert_eq!(input_proto_size, 49);
519 assert_eq!(written, input_proto_size + 4);
520 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
521 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
522 }
523
524 #[test]
525 fn test_encode_batch_protobuf_multiple() {
526 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
527 let messages = vec![message_raw.clone(), message_raw.clone()];
528 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
529
530 let mut buf = BytesMut::with_capacity(128);
531 for message in messages {
532 buf.reserve(4 + message.len());
534 buf.put_uint(message.len() as u64, 4);
535 buf.extend_from_slice(&message[..]);
536 }
537 let expected_bytes = buf.freeze();
538
539 let config = ProtobufSerializerConfig {
540 protobuf: ProtobufSerializerOptions {
541 desc_file: test_data_dir().join("test_proto.desc"),
542 message_type: "test_proto.User".to_string(),
543 use_json_names: false,
544 },
545 };
546
547 let encoding = (
548 Transformer::default(),
549 vector_lib::codecs::Encoder::<Framer>::new(
550 LengthDelimitedEncoder::default().into(),
551 config.build().unwrap().into(),
552 ),
553 );
554
555 let mut writer = Vec::new();
556 let input = vec![
557 Event::Log(LogEvent::from(BTreeMap::from([
558 (KeyString::from("id"), Value::from("123")),
559 (KeyString::from("name"), Value::from("Alice")),
560 (KeyString::from("age"), Value::from(30)),
561 (
562 KeyString::from("emails"),
563 Value::from(vec!["alice@example.com", "alice@work.com"]),
564 ),
565 ]))),
566 Event::Log(LogEvent::from(BTreeMap::from([
567 (KeyString::from("id"), Value::from("123")),
568 (KeyString::from("name"), Value::from("Alice")),
569 (KeyString::from("age"), Value::from(30)),
570 (
571 KeyString::from("emails"),
572 Value::from(vec!["alice@example.com", "alice@work.com"]),
573 ),
574 ]))),
575 ];
576
577 let input_json_size: JsonSize = input
578 .iter()
579 .map(|event| event.estimated_json_encoded_size_of())
580 .sum();
581
582 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
583
584 assert_eq!(total_input_proto_size, 49 * 2);
585 assert_eq!(written, total_input_proto_size + 8);
586 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
587 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
588 }
589
590 #[cfg(feature = "codecs-arrow")]
591 #[test]
592 fn test_encode_batch_arrow_emits_record_batch_error_on_type_mismatch() {
593 clear_recorded_events();
594
595 let schema = ArrowSchema::new(vec![Field::new("message", DataType::Int64, false)]);
598 let serializer = ArrowStreamSerializer::new(ArrowStreamSerializerConfig::new(schema))
599 .expect("failed to build ArrowStreamSerializer");
600 let encoder = BatchEncoder::new(BatchSerializer::Arrow(serializer));
601 let encoding = (Transformer::default(), encoder);
602
603 let event = Event::Log(LogEvent::from(BTreeMap::from([(
604 KeyString::from("message"),
605 Value::from("not_an_integer"),
606 )])));
607
608 let mut writer = Vec::new();
609 let result = encoding.encode_input(vec![event], &mut writer);
610 assert!(
611 result.is_err(),
612 "type mismatch should fail batch encoding, got {result:?}"
613 );
614
615 contains_name_once("EncoderRecordBatchError")
616 .expect("EncoderRecordBatchError should be emitted on ArrowJsonDecode failure");
617 contains_name_once("ComponentEventsDropped")
618 .expect("ComponentEventsDropped should be emitted by the wrapper");
619 }
620}