Skip to main content

codecs/encoding/
encoder.rs

1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8#[cfg(feature = "parquet")]
9use crate::encoding::ParquetSerializer;
10use crate::{
11    encoding::{Error, Framer, Serializer},
12    internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15/// The output of a batch encoding operation.
16///
17/// Only available when the `arrow` feature is enabled.
18#[cfg(feature = "arrow")]
19#[derive(Debug)]
20pub enum BatchOutput {
21    /// An Arrow RecordBatch containing all events encoded as columnar data.
22    Arrow(arrow::record_batch::RecordBatch),
23}
24
25/// Serializers that support batch encoding (encoding all events at once).
26///
27/// Only available when the `arrow` feature is enabled (the `parquet` feature
28/// implies `arrow`).
29#[cfg(feature = "arrow")]
30#[derive(Debug, Clone)]
31pub enum BatchSerializer {
32    /// Arrow IPC stream format serializer.
33    Arrow(ArrowStreamSerializer),
34    /// Parquet format serializer.
35    #[cfg(feature = "parquet")]
36    Parquet(Box<ParquetSerializer>),
37}
38
39/// An encoder that encodes batches of events.
40#[cfg(feature = "arrow")]
41#[derive(Debug, Clone)]
42pub struct BatchEncoder {
43    serializer: BatchSerializer,
44}
45
46#[cfg(feature = "arrow")]
47impl BatchEncoder {
48    /// Creates a new `BatchEncoder` with the specified batch serializer.
49    pub const fn new(serializer: BatchSerializer) -> Self {
50        Self { serializer }
51    }
52
53    /// Get the batch serializer.
54    pub const fn serializer(&self) -> &BatchSerializer {
55        &self.serializer
56    }
57
58    /// Get the HTTP content type.
59    pub const fn content_type(&self) -> Option<&'static str> {
60        match &self.serializer {
61            BatchSerializer::Arrow(_) => Some("application/vnd.apache.arrow.stream"),
62            #[cfg(feature = "parquet")]
63            BatchSerializer::Parquet(_) => Some("application/vnd.apache.parquet"),
64        }
65    }
66
67    /// Encode a batch of events into a `BatchOutput`.
68    pub fn encode_batch(&self, events: &[Event]) -> Result<BatchOutput, Error> {
69        match &self.serializer {
70            BatchSerializer::Arrow(serializer) => {
71                let record_batch = serializer.encode_to_record_batch(events).map_err(|err| {
72                    use crate::encoding::ArrowEncodingError;
73                    match err {
74                        ArrowEncodingError::NullConstraint { .. } => {
75                            Error::SchemaConstraintViolation(Box::new(err))
76                        }
77                        _ => Error::SerializingError(Box::new(err)),
78                    }
79                })?;
80                Ok(BatchOutput::Arrow(record_batch))
81            }
82            #[cfg(feature = "parquet")]
83            BatchSerializer::Parquet(_) => Err(Error::SerializingError(Box::from(
84                "Parquet serializer does not support encode_batch; use the tokio Encoder interface instead",
85            ))),
86        }
87    }
88}
89
90#[cfg(feature = "arrow")]
91impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
92    type Error = Error;
93
94    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
95        match &mut self.serializer {
96            BatchSerializer::Arrow(serializer) => {
97                serializer.encode(events, buffer).map_err(|err| {
98                    use crate::encoding::ArrowEncodingError;
99                    match err {
100                        ArrowEncodingError::NullConstraint { .. } => {
101                            Error::SchemaConstraintViolation(Box::new(err))
102                        }
103                        _ => Error::SerializingError(Box::new(err)),
104                    }
105                })
106            }
107            #[cfg(feature = "parquet")]
108            BatchSerializer::Parquet(serializer) => serializer
109                .encode(events, buffer)
110                .map_err(Error::SerializingError),
111        }
112    }
113}
114
115/// A wrapper that supports both framed and batch encoding modes.
116#[derive(Debug, Clone)]
117pub enum EncoderKind {
118    /// Uses framing to encode individual events
119    Framed(Box<Encoder<Framer>>),
120    /// Encodes events in batches without framing
121    #[cfg(feature = "arrow")]
122    Batch(BatchEncoder),
123}
124
125#[derive(Debug, Clone)]
126/// An encoder that can encode structured events into byte frames.
127pub struct Encoder<Framer>
128where
129    Framer: Clone,
130{
131    framer: Framer,
132    serializer: Serializer,
133}
134
135impl Default for Encoder<Framer> {
136    fn default() -> Self {
137        use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
138
139        Self {
140            framer: NewlineDelimitedEncoder::default().into(),
141            serializer: TextSerializerConfig::default().build().into(),
142        }
143    }
144}
145
146impl Default for Encoder<()> {
147    fn default() -> Self {
148        use crate::encoding::TextSerializerConfig;
149
150        Self {
151            framer: (),
152            serializer: TextSerializerConfig::default().build().into(),
153        }
154    }
155}
156
157impl<Framer> Encoder<Framer>
158where
159    Framer: Clone,
160{
161    /// Serialize the event without applying framing.
162    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
163        let len = buffer.len();
164        let mut payload = buffer.split_off(len);
165
166        self.serialize_at_start(event, &mut payload)?;
167
168        buffer.unsplit(payload);
169
170        Ok(())
171    }
172
173    /// Serialize the event without applying framing, at the start of the provided buffer.
174    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
175        self.serializer.encode(event, buffer).map_err(|error| {
176            emit(EncoderSerializeError { error: &error });
177            Error::SerializingError(error)
178        })
179    }
180}
181
182impl Encoder<Framer> {
183    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
184    /// from a structured event, and the `Framer` to wrap these into a byte
185    /// frame.
186    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
187        Self { framer, serializer }
188    }
189
190    /// Get the framer.
191    pub const fn framer(&self) -> &Framer {
192        &self.framer
193    }
194
195    /// Get the serializer.
196    pub const fn serializer(&self) -> &Serializer {
197        &self.serializer
198    }
199
200    /// Get the prefix that encloses a batch of events.
201    pub const fn batch_prefix(&self) -> &[u8] {
202        match (&self.framer, &self.serializer) {
203            (
204                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
205                    delimiter: b',',
206                }),
207                Serializer::Json(_) | Serializer::NativeJson(_),
208            ) => b"[",
209            _ => &[],
210        }
211    }
212
213    /// Get the suffix that encloses a batch of events.
214    pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
215        match (&self.framer, &self.serializer, empty) {
216            (
217                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
218                    delimiter: b',',
219                }),
220                Serializer::Json(_) | Serializer::NativeJson(_),
221                _,
222            ) => b"]",
223            (Framer::NewlineDelimited(_), _, false) => b"\n",
224            _ => &[],
225        }
226    }
227
228    /// Get the HTTP content type.
229    pub const fn content_type(&self) -> &'static str {
230        match (&self.serializer, &self.framer) {
231            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
232                "application/x-ndjson"
233            }
234            (
235                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
236                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
237                    delimiter: b',',
238                }),
239            ) => "application/json",
240            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
241            (
242                Serializer::Avro(_)
243                | Serializer::Cef(_)
244                | Serializer::Csv(_)
245                | Serializer::Gelf(_)
246                | Serializer::Json(_)
247                | Serializer::Logfmt(_)
248                | Serializer::NativeJson(_)
249                | Serializer::RawMessage(_)
250                | Serializer::Text(_),
251                _,
252            ) => "text/plain",
253            #[cfg(feature = "syslog")]
254            (Serializer::Syslog(_), _) => "text/plain",
255            #[cfg(feature = "opentelemetry")]
256            (Serializer::Otlp(_), _) => "application/x-protobuf",
257        }
258    }
259}
260
261impl Encoder<()> {
262    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
263    /// from a structured event.
264    pub const fn new(serializer: Serializer) -> Self {
265        Self {
266            framer: (),
267            serializer,
268        }
269    }
270
271    /// Get the serializer.
272    pub const fn serializer(&self) -> &Serializer {
273        &self.serializer
274    }
275}
276
277impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
278    type Error = Error;
279
280    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
281        let len = buffer.len();
282        let mut payload = buffer.split_off(len);
283
284        self.serialize_at_start(event, &mut payload)?;
285
286        // Frame the serialized event.
287        self.framer.encode((), &mut payload).map_err(|error| {
288            emit(EncoderFramingError { error: &error });
289            Error::FramingError(error)
290        })?;
291
292        buffer.unsplit(payload);
293
294        Ok(())
295    }
296}
297
298impl tokio_util::codec::Encoder<Event> for Encoder<()> {
299    type Error = Error;
300
301    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
302        let len = buffer.len();
303        let mut payload = buffer.split_off(len);
304
305        self.serialize_at_start(event, &mut payload)?;
306
307        buffer.unsplit(payload);
308
309        Ok(())
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use bytes::BufMut;
316    use futures::{SinkExt, StreamExt};
317    use tokio_util::codec::FramedWrite;
318    use vector_core::event::LogEvent;
319
320    use super::*;
321    use crate::encoding::BoxedFramingError;
322
323    #[derive(Debug, Clone)]
324    struct ParenEncoder;
325
326    impl ParenEncoder {
327        pub(super) const fn new() -> Self {
328            Self
329        }
330    }
331
332    impl tokio_util::codec::Encoder<()> for ParenEncoder {
333        type Error = BoxedFramingError;
334
335        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
336            dst.reserve(2);
337            let inner = dst.split();
338            dst.put_u8(b'(');
339            dst.unsplit(inner);
340            dst.put_u8(b')');
341            Ok(())
342        }
343    }
344
345    #[derive(Debug, Clone)]
346    struct ErrorNthEncoder<T>(T, usize, usize)
347    where
348        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
349
350    impl<T> ErrorNthEncoder<T>
351    where
352        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
353    {
354        pub(super) const fn new(encoder: T, n: usize) -> Self {
355            Self(encoder, 0, n)
356        }
357    }
358
359    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
360    where
361        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
362    {
363        type Error = BoxedFramingError;
364
365        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
366            self.0.encode((), dst)?;
367            let result = if self.1 == self.2 {
368                Err(Box::new(std::io::Error::other("error")) as _)
369            } else {
370                Ok(())
371            };
372            self.1 += 1;
373            result
374        }
375    }
376
377    #[tokio::test]
378    async fn test_encode_events_sink_empty() {
379        let encoder = Encoder::<Framer>::new(
380            Framer::Boxed(Box::new(ParenEncoder::new())),
381            crate::encoding::TextSerializerConfig::default()
382                .build()
383                .into(),
384        );
385        let source = futures::stream::iter(vec![
386            Event::Log(LogEvent::from("foo")),
387            Event::Log(LogEvent::from("bar")),
388            Event::Log(LogEvent::from("baz")),
389        ])
390        .map(Ok);
391        let sink = Vec::new();
392        let mut framed = FramedWrite::new(sink, encoder);
393        source.forward(&mut framed).await.unwrap();
394        let sink = framed.into_inner();
395        assert_eq!(sink, b"(foo)(bar)(baz)");
396    }
397
398    #[tokio::test]
399    async fn test_encode_events_sink_non_empty() {
400        let encoder = Encoder::<Framer>::new(
401            Framer::Boxed(Box::new(ParenEncoder::new())),
402            crate::encoding::TextSerializerConfig::default()
403                .build()
404                .into(),
405        );
406        let source = futures::stream::iter(vec![
407            Event::Log(LogEvent::from("bar")),
408            Event::Log(LogEvent::from("baz")),
409            Event::Log(LogEvent::from("bat")),
410        ])
411        .map(Ok);
412        let sink = Vec::from("(foo)");
413        let mut framed = FramedWrite::new(sink, encoder);
414        source.forward(&mut framed).await.unwrap();
415        let sink = framed.into_inner();
416        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
417    }
418
419    #[tokio::test]
420    async fn test_encode_events_sink_empty_handle_framing_error() {
421        let encoder = Encoder::<Framer>::new(
422            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
423            crate::encoding::TextSerializerConfig::default()
424                .build()
425                .into(),
426        );
427        let source = futures::stream::iter(vec![
428            Event::Log(LogEvent::from("foo")),
429            Event::Log(LogEvent::from("bar")),
430            Event::Log(LogEvent::from("baz")),
431        ])
432        .map(Ok);
433        let sink = Vec::new();
434        let mut framed = FramedWrite::new(sink, encoder);
435        assert!(source.forward(&mut framed).await.is_err());
436        framed.flush().await.unwrap();
437        let sink = framed.into_inner();
438        assert_eq!(sink, b"(foo)");
439    }
440
441    #[tokio::test]
442    async fn test_encode_events_sink_non_empty_handle_framing_error() {
443        let encoder = Encoder::<Framer>::new(
444            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
445            crate::encoding::TextSerializerConfig::default()
446                .build()
447                .into(),
448        );
449        let source = futures::stream::iter(vec![
450            Event::Log(LogEvent::from("bar")),
451            Event::Log(LogEvent::from("baz")),
452            Event::Log(LogEvent::from("bat")),
453        ])
454        .map(Ok);
455        let sink = Vec::from("(foo)");
456        let mut framed = FramedWrite::new(sink, encoder);
457        assert!(source.forward(&mut framed).await.is_err());
458        framed.flush().await.unwrap();
459        let sink = framed.into_inner();
460        assert_eq!(sink, b"(foo)(bar)");
461    }
462
463    #[tokio::test]
464    async fn test_encode_batch_newline() {
465        let encoder = Encoder::<Framer>::new(
466            Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
467            crate::encoding::TextSerializerConfig::default()
468                .build()
469                .into(),
470        );
471        let source = futures::stream::iter(vec![
472            Event::Log(LogEvent::from("bar")),
473            Event::Log(LogEvent::from("baz")),
474            Event::Log(LogEvent::from("bat")),
475        ])
476        .map(Ok);
477        let sink: Vec<u8> = Vec::new();
478        let mut framed = FramedWrite::new(sink, encoder);
479        source.forward(&mut framed).await.unwrap();
480        let sink = framed.into_inner();
481        assert_eq!(sink, b"bar\nbaz\nbat\n");
482    }
483}