Skip to main content

codecs/encoding/format/
parquet.rs

1// Derivative's Debug impl generates 'let _ = field.fmt(f)' which triggers this lint.
2#![allow(clippy::let_underscore_must_use)]
3
4//! Parquet batch format codec for batched event encoding
5//!
6//! Provides Apache Parquet format encoding with schema file support and auto-inference.
7//! Reuses the Arrow record batch building logic from the Arrow IPC codec,
8//! then writes the batch as a complete Parquet file using `ArrowWriter`.
9
10use std::collections::HashSet;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
15use arrow::error::ArrowError;
16use arrow::json::reader::infer_json_schema_from_iterator;
17use arrow::record_batch::RecordBatch;
18use bytes::{BufMut, BytesMut};
19use derivative::Derivative;
20use parquet::arrow::ArrowWriter;
21use parquet::basic::ZstdLevel;
22use parquet::basic::{Compression as ParquetCodecCompression, GzipLevel};
23use parquet::file::properties::WriterProperties;
24use std::io::{Error, ErrorKind};
25use tracing::warn;
26use vector_common::internal_event::{
27    ComponentEventsDropped, Count, InternalEventHandle, Registered, UNINTENTIONAL, emit, register,
28};
29use vector_config::configurable_component;
30use vector_core::event::Event;
31
32use super::arrow::{ArrowEncodingError, build_record_batch};
33use crate::encoding::format::arrow::vector_log_events_to_json_values;
34use crate::internal_events::{ArrowWriterError, JsonSerializationError, SchemaGenerationError};
35
36type EventsDroppedError = ComponentEventsDropped<'static, UNINTENTIONAL>;
37
38/// Compression algorithm and optional level for archive objects.
39#[configurable_component]
40#[derive(Default, Copy, Clone, Debug, PartialEq)]
41#[configurable(metadata(
42    docs::enum_tag_description = "Compression codec applied per column page inside the Parquet file."
43))]
44#[serde(tag = "algorithm", rename_all = "snake_case")]
45pub enum ParquetCompression {
46    /// Zstd compression. Level must be between 1 and 21.
47    Zstd {
48        /// Compression level (1–21). This is the range Vector supports; higher values compress more but are slower.
49        #[configurable(validation(range(min = 1, max = 21)))]
50        level: u8,
51    },
52    /// Gzip compression. Level must be between 1 and 9.
53    Gzip {
54        /// Compression level (1–9). This is the range Vector supports; higher values compress more but are slower.
55        #[configurable(validation(range(min = 1, max = 9)))]
56        level: u8,
57    },
58
59    /// Snappy compression (no level).
60    #[default]
61    Snappy,
62
63    /// LZ4 raw compression
64    Lz4,
65
66    /// No compression
67    None,
68}
69
70impl TryFrom<ParquetCompression> for ParquetCodecCompression {
71    type Error = parquet::errors::ParquetError;
72    fn try_from(
73        value: ParquetCompression,
74    ) -> Result<ParquetCodecCompression, parquet::errors::ParquetError> {
75        match value {
76            ParquetCompression::None => Ok(ParquetCodecCompression::UNCOMPRESSED),
77            ParquetCompression::Snappy => Ok(ParquetCodecCompression::SNAPPY),
78            ParquetCompression::Zstd { level } => Ok(ParquetCodecCompression::ZSTD(
79                ZstdLevel::try_new(level.into())?,
80            )),
81            ParquetCompression::Gzip { level } => Ok(ParquetCodecCompression::GZIP(
82                GzipLevel::try_new(level.into())?,
83            )),
84            ParquetCompression::Lz4 => Ok(ParquetCodecCompression::LZ4_RAW),
85        }
86    }
87}
88
89/// Schema handling mode.
90#[configurable_component]
91#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
92#[serde(rename_all = "snake_case")]
93pub enum ParquetSchemaMode {
94    /// Missing fields become null. Extra fields are silently dropped.
95    #[default]
96    Relaxed,
97    /// Missing fields become null. Extra fields cause an error.
98    Strict,
99    /// Auto infer schema based on the batch. No schema file needed.
100    AutoInfer,
101}
102
103/// Configuration for the Parquet serializer.
104///
105/// Encodes events as Apache Parquet columnar files, optimized for analytical queries
106/// via Athena, Trino, Spark, and other columnar query engines.
107///
108/// Either `schema_file` must be provided, or `schema_mode` must be set to `auto_infer`.
109#[configurable_component]
110#[derive(Clone, Debug, Default)]
111pub struct ParquetSerializerConfig {
112    /// Path to a native Parquet schema file (`.schema`).
113    ///
114    /// Required unless `schema_mode` is `auto_infer`. The file must contain a valid
115    /// Parquet message type definition.
116    #[serde(default)]
117    pub schema_file: Option<PathBuf>,
118
119    /// Compression codec applied per column page inside the Parquet file.
120    #[serde(default)]
121    #[configurable(derived)]
122    pub compression: ParquetCompression,
123
124    /// Controls how events with fields not present in the schema are handled.
125    #[serde(default)]
126    #[configurable(derived)]
127    pub schema_mode: ParquetSchemaMode,
128}
129
130impl ParquetSerializerConfig {
131    /// Resolve the Arrow schema from the configured schema source.
132    fn resolve_schema(&self) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
133        if self.schema_mode == ParquetSchemaMode::AutoInfer {
134            return Ok(Schema::empty());
135        }
136
137        let path = self
138            .schema_file
139            .as_ref()
140            .ok_or("schema_file is required unless schema_mode is auto_infer")?;
141
142        let content = read_schema_file(path, "schema_file")?;
143        let parquet_type = parquet::schema::parser::parse_message_type(&content)
144            .map_err(|e| format!("Failed to parse Parquet schema: {e}"))?;
145        let schema_desc = parquet::schema::types::SchemaDescriptor::new(Arc::new(parquet_type));
146        let arrow_schema = parquet::arrow::parquet_to_arrow_schema(&schema_desc, None)
147            .map_err(|e| format!("Failed to convert Parquet schema to Arrow: {e}"))?;
148        Ok(arrow_schema)
149    }
150
151    /// The data type of events that are accepted by `ParquetSerializer`.
152    pub fn input_type(&self) -> vector_core::config::DataType {
153        vector_core::config::DataType::Log
154    }
155
156    /// The schema required by the serializer.
157    pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
158        vector_core::schema::Requirement::empty()
159    }
160}
161
162fn read_schema_file(
163    path: &std::path::Path,
164    field_name: &str,
165) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
166    const MAX_SCHEMA_FILE_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
167    let display = path.display();
168    let metadata = std::fs::metadata(path)
169        .map_err(|e| format!("Failed to read {field_name} '{display}': {e}"))?;
170    if metadata.len() > MAX_SCHEMA_FILE_SIZE {
171        return Err(format!(
172            "{field_name} '{display}' is too large ({} bytes, max {MAX_SCHEMA_FILE_SIZE})",
173            metadata.len()
174        )
175        .into());
176    }
177    std::fs::read_to_string(path)
178        .map_err(|e| format!("Failed to read {field_name} '{display}': {e}").into())
179}
180
181/// Check the resolved Arrow schema for data types unsupported by the JSON-based
182/// encode path (`arrow::json::reader::ReaderBuilder`). Binary variants are
183/// accepted by Parquet/Arrow at the schema level but the JSON decoder rejects
184/// them at runtime, so we fail fast here at config time.
185fn reject_unsupported_arrow_types(
186    schema: &Schema,
187) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
188    fn check_field(field: &Field, path: &str, bad: &mut Vec<String>) {
189        let name = if path.is_empty() {
190            field.name().to_string()
191        } else {
192            format!("{path}.{}", field.name())
193        };
194        match field.data_type() {
195            DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
196                bad.push(format!("'{name}' ({:?})", field.data_type()));
197            }
198            DataType::Struct(fields) => {
199                for f in fields {
200                    check_field(f, &name, bad);
201                }
202            }
203            DataType::List(inner) | DataType::LargeList(inner) => {
204                check_field(inner, &name, bad);
205            }
206            DataType::Map(entries_field, _) => {
207                if let DataType::Struct(kv) = entries_field.data_type() {
208                    for f in kv {
209                        check_field(f, &name, bad);
210                    }
211                }
212            }
213            _ => {}
214        }
215    }
216
217    let mut bad = Vec::new();
218    for field in schema.fields() {
219        check_field(field, "", &mut bad);
220    }
221    if !bad.is_empty() {
222        return Err(format!(
223            "Schema contains binary field(s) unsupported by the JSON-based Arrow encoder: {}. \
224             Use Utf8 for base64/hex-encoded data instead.",
225            bad.join(", ")
226        )
227        .into());
228    }
229    Ok(())
230}
231
232/// Parquet batch serializer.
233#[derive(Derivative)]
234#[derivative(Debug, Clone)]
235pub struct ParquetSerializer {
236    schema: SchemaRef,
237    writer_props: Arc<WriterProperties>,
238    schema_mode: ParquetSchemaMode,
239    /// Pre-built set of schema field names for O(1) strict-mode lookups.
240    schema_field_names: HashSet<String>,
241
242    #[derivative(Debug = "ignore")]
243    events_dropped_handle: Registered<EventsDroppedError>,
244}
245
246impl ParquetSerializer {
247    /// Create a new `ParquetSerializer` from the given configuration.
248    pub fn new(
249        config: ParquetSerializerConfig,
250    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
251        let schema = config.resolve_schema()?;
252        reject_unsupported_arrow_types(&schema)?;
253        let schema_ref = SchemaRef::new(schema);
254
255        let schema_field_names = schema_ref
256            .fields()
257            .iter()
258            .map(|f| f.name().clone())
259            .collect::<HashSet<_>>();
260
261        let writer_props = Arc::new(
262            WriterProperties::builder()
263                .set_compression(config.compression.try_into()?)
264                .build(),
265        );
266
267        Ok(Self {
268            schema: schema_ref,
269            writer_props,
270            schema_mode: config.schema_mode,
271            schema_field_names,
272            events_dropped_handle: register(EventsDroppedError::from(
273                "Events could not be serialized to parquet",
274            )),
275        })
276    }
277
278    /// Returns the MIME content type for Parquet data.
279    pub const fn content_type(&self) -> &'static str {
280        "application/vnd.apache.parquet"
281    }
282
283    /// Writes `record_batch` into `buffer` as a complete Parquet file.
284    ///
285    /// On failure, emits an [`ArrowWriterError`] internal event (which
286    /// increments `component_errors_total`) before returning the error.
287    /// The caller is responsible for emitting `events_dropped`.
288    fn write_record_batch(
289        record_batch: &RecordBatch,
290        buffer: &mut BytesMut,
291        writer_props: &WriterProperties,
292    ) -> Result<(), parquet::errors::ParquetError> {
293        let mut writer = ArrowWriter::try_new(
294            buffer.writer(),
295            Arc::clone(record_batch.schema_ref()),
296            Some(writer_props.clone()),
297        )
298        .inspect_err(|e| {
299            emit(ArrowWriterError { error: e });
300        })?;
301
302        writer.write(record_batch).inspect_err(|e| {
303            emit(ArrowWriterError { error: e });
304        })?;
305
306        writer.close().inspect_err(|e| {
307            emit(ArrowWriterError { error: e });
308        })?;
309
310        Ok(())
311    }
312}
313
314impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
315    type Error = vector_common::Error;
316
317    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
318        if events.is_empty() {
319            return Ok(());
320        }
321
322        let json_values = match vector_log_events_to_json_values(&events) {
323            Ok(values) => values,
324            Err(e) => {
325                emit(JsonSerializationError { error: &e });
326                return Err(Box::new(e));
327            }
328        };
329
330        let non_log_count = events.len() - json_values.len();
331
332        if non_log_count > 0 {
333            warn!(
334                message = "Non-log events dropped by Parquet encoder ",
335                %non_log_count,
336                internal_log_rate_secs = 10,
337            );
338            self.events_dropped_handle.emit(Count(non_log_count))
339        }
340
341        if json_values.is_empty() {
342            return Ok(());
343        }
344
345        match self.schema_mode {
346            // In strict mode, check for extra top-level fields not in the schema.
347            ParquetSchemaMode::Strict => {
348                for event in &events {
349                    if let Some(log) = event.maybe_as_log()
350                        && let Some(object_map) = log.as_map()
351                    {
352                        for top_level in object_map.keys() {
353                            if !self.schema_field_names.contains(top_level.as_str()) {
354                                return Err(Box::new(ArrowEncodingError::SchemaFetchError {
355                                    message: format!(
356                                        "Strict schema mode: event contains field '{top_level}' not in schema",
357                                    ),
358                                }));
359                            }
360                        }
361                    }
362                }
363            }
364            ParquetSchemaMode::AutoInfer => {
365                let schema = ParquetSchemaGenerator::infer_schema(&json_values)?;
366                self.schema = Arc::new(ParquetSchemaGenerator::try_normalize_schema(
367                    &events, schema,
368                ));
369            }
370            ParquetSchemaMode::Relaxed => {}
371        }
372
373        let record_batch =
374            build_record_batch(Arc::clone(&self.schema), &json_values).map_err(Box::new)?;
375
376        Self::write_record_batch(&record_batch, buffer, &self.writer_props).map_err(Box::new)?;
377
378        Ok(())
379    }
380}
381
382pub struct ParquetSchemaGenerator {}
383
384impl ParquetSchemaGenerator {
385    pub fn infer_schema(events: &[serde_json::Value]) -> Result<Schema, Error> {
386        let schema = infer_json_schema_from_iterator(events.iter().map(Ok::<_, ArrowError>))
387            .map_err(|e| {
388                emit(SchemaGenerationError { error: &e });
389                Error::new(ErrorKind::InvalidData, e.to_string())
390            })?;
391
392        Ok(schema)
393    }
394
395    /// Attempt to modify schema to set timestamp fields as Timestamp instead of Utf8.
396    /// Only works for top-level fields.
397    fn try_normalize_schema(events: &[Event], schema: Schema) -> Schema {
398        let mut ts_seen: HashSet<String> = HashSet::new();
399        let mut non_ts_seen: HashSet<String> = HashSet::new();
400
401        for event in events.iter().filter_map(Event::maybe_as_log) {
402            if let Some(object_map) = event.as_map() {
403                for (path, value) in object_map {
404                    if value.is_timestamp() {
405                        ts_seen.insert(path.to_string());
406                    } else if !value.is_null() {
407                        non_ts_seen.insert(path.to_string());
408                    }
409                }
410            }
411        }
412
413        let new_fields: Vec<Field> = schema
414            .fields()
415            .iter()
416            .map(|f| {
417                if ts_seen.contains(f.name()) && !non_ts_seen.contains(f.name()) {
418                    Field::new(
419                        f.name(),
420                        DataType::Timestamp(
421                            arrow::datatypes::TimeUnit::Microsecond,
422                            Some("UTC".into()),
423                        ),
424                        f.is_nullable(),
425                    )
426                } else {
427                    f.as_ref().clone()
428                }
429            })
430            .collect();
431
432        Schema::new_with_metadata(new_fields, schema.metadata().clone())
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use bytes::Bytes;
440    use parquet::file::reader::{FileReader, SerializedFileReader};
441    use parquet::record::reader::RowIter;
442    use tokio_util::codec::Encoder;
443    use vector_core::event::LogEvent;
444
445    fn create_event<V>(fields: Vec<(&str, V)>) -> Event
446    where
447        V: Into<vector_core::event::Value>,
448    {
449        let mut log = LogEvent::default();
450        for (key, value) in fields {
451            log.insert(key, value.into());
452        }
453        Event::Log(log)
454    }
455
456    fn assert_parquet_magic(data: &[u8]) {
457        assert!(data.len() >= 4, "Output too short to be valid Parquet");
458        assert_eq!(&data[..4], b"PAR1", "Missing Parquet magic bytes");
459    }
460
461    fn parquet_row_count(data: &[u8]) -> usize {
462        let reader =
463            SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
464        let iter = RowIter::from_file_into(Box::new(reader));
465        iter.count()
466    }
467
468    fn parquet_column_names(data: &[u8]) -> Vec<String> {
469        let reader =
470            SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
471        let schema = reader.metadata().file_metadata().schema_descr();
472        schema
473            .columns()
474            .iter()
475            .map(|c| c.name().to_string())
476            .collect()
477    }
478
479    fn parse_timestamp(s: &str) -> chrono::DateTime<chrono::Utc> {
480        chrono::DateTime::parse_from_rfc3339(s)
481            .expect("invalid test timestamp")
482            .with_timezone(&chrono::Utc)
483    }
484
485    fn demo_log_event(
486        message: &str,
487        timestamp: chrono::DateTime<chrono::Utc>,
488        status_code: i64,
489        response_time_secs: f64,
490    ) -> Event {
491        use vector_core::event::Value;
492        let mut log = LogEvent::default();
493        log.insert("host", "localhost");
494        log.insert("message", message);
495        log.insert("service", "vector");
496        log.insert("source_type", "demo_logs");
497        log.insert("timestamp", Value::Timestamp(timestamp));
498        log.insert("random_time", Value::Timestamp(timestamp));
499        log.insert("status_code", Value::Integer(status_code));
500        log.insert("response_time_secs", response_time_secs);
501        Event::Log(log)
502    }
503
504    fn sample_events() -> Vec<Event> {
505        const EVENTS: [(&str, &str, i64, f64); 5] = [
506            (
507                "GET /api/v1/health HTTP/1.1",
508                "2026-03-05T20:49:08.037194Z",
509                200,
510                0.037,
511            ),
512            (
513                "POST /api/v1/ingest HTTP/1.1",
514                "2026-03-05T20:49:09.038051Z",
515                201,
516                0.013,
517            ),
518            (
519                "GET /metrics HTTP/1.1",
520                "2026-03-05T20:49:10.036612Z",
521                200,
522                0.022,
523            ),
524            (
525                "DELETE /api/v1/resource HTTP/1.1",
526                "2026-03-05T20:49:11.537131Z",
527                404,
528                0.005,
529            ),
530            (
531                "PATCH /api/v1/config HTTP/1.1",
532                "2026-03-05T20:49:12.037491Z",
533                500,
534                0.091,
535            ),
536        ];
537        EVENTS
538            .iter()
539            .map(|(msg, ts, status, rt)| demo_log_event(msg, parse_timestamp(ts), *status, *rt))
540            .collect()
541    }
542
543    fn encode_autoinfer_and_read_schema(
544        events: Vec<Event>,
545    ) -> (arrow::datatypes::SchemaRef, usize) {
546        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
547
548        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
549            schema_mode: ParquetSchemaMode::AutoInfer,
550            ..Default::default()
551        })
552        .expect("AutoInfer serializer should be created without a static schema");
553
554        let mut buffer = BytesMut::new();
555        serializer
556            .encode(events, &mut buffer)
557            .expect("encoding should succeed");
558
559        let data = buffer.freeze();
560        assert_parquet_magic(&data);
561
562        let builder = ParquetRecordBatchReaderBuilder::try_new(data)
563            .expect("should build ParquetRecordBatchReaderBuilder");
564        let schema = builder.schema().clone();
565        let num_rows: usize = builder
566            .build()
567            .expect("should build reader")
568            .map(|b| b.expect("batch read error").num_rows())
569            .sum();
570        (schema, num_rows)
571    }
572
573    /// Write a temporary Parquet schema file and return its path.
574    ///
575    /// `name` must be unique per test to avoid parallel-test races on the same file.
576    fn write_temp_schema(name: &str, content: &str) -> std::path::PathBuf {
577        use std::io::Write;
578        let path = std::env::temp_dir().join(format!(
579            "vector_parquet_test_{}_{}.schema",
580            std::process::id(),
581            name,
582        ));
583        let mut f = std::fs::File::create(&path).expect("Failed to create schema file");
584        write!(f, "{content}").expect("Failed to write schema");
585        path
586    }
587
588    // ── AutoInfer mode ───────────────────────────────────────────────────────
589
590    #[test]
591    fn encode_input_produces_parquet_output() {
592        let events = sample_events();
593        let n_events = events.len();
594        let (schema, num_rows) = encode_autoinfer_and_read_schema(events);
595
596        assert_eq!(num_rows, n_events, "row count should match event count");
597
598        for field_name in &["timestamp", "random_time"] {
599            let field = schema
600                .field_with_name(field_name)
601                .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
602            assert!(
603                matches!(
604                    field.data_type(),
605                    DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC"
606                ),
607                "'{field_name}' should be Timestamp(Microsecond, UTC), got {:?}",
608                field.data_type()
609            );
610        }
611
612        let status_field = schema
613            .field_with_name("status_code")
614            .expect("status_code field should exist");
615        assert_eq!(status_field.data_type(), &DataType::Int64);
616
617        let rt_field = schema
618            .field_with_name("response_time_secs")
619            .expect("response_time_secs field should exist");
620        assert_eq!(rt_field.data_type(), &DataType::Float64);
621
622        for field_name in &["host", "message", "service", "source_type"] {
623            let field = schema
624                .field_with_name(field_name)
625                .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
626            assert_eq!(field.data_type(), &DataType::Utf8);
627        }
628    }
629
630    #[test]
631    fn test_parquet_empty_events() {
632        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
633            schema_mode: ParquetSchemaMode::AutoInfer,
634            ..Default::default()
635        })
636        .expect("AutoInfer serializer should succeed");
637
638        let events: Vec<Event> = vec![];
639        let mut buffer = BytesMut::new();
640        serializer
641            .encode(events, &mut buffer)
642            .expect("Empty events should succeed");
643
644        assert!(buffer.is_empty(), "Buffer should be empty for empty events");
645    }
646
647    #[test]
648    fn test_parquet_compression_variants() {
649        let events = vec![create_event(vec![("msg", "hello world")])];
650
651        let compressions = vec![
652            ParquetCompression::None,
653            ParquetCompression::Snappy,
654            ParquetCompression::Zstd { level: 1 },
655            ParquetCompression::Gzip { level: 1 },
656            ParquetCompression::Lz4,
657        ];
658
659        for compression in compressions {
660            let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
661                schema_mode: ParquetSchemaMode::AutoInfer,
662                compression,
663                ..Default::default()
664            })
665            .expect("Failed to create serializer");
666
667            let mut buffer = BytesMut::new();
668            serializer
669                .encode(events.clone(), &mut buffer)
670                .unwrap_or_else(|e| panic!("Encoding with {:?} failed: {}", compression, e));
671
672            let data = buffer.freeze();
673            assert_parquet_magic(&data);
674            assert_eq!(
675                parquet_row_count(&data),
676                1,
677                "Wrong row count for {:?}",
678                compression
679            );
680        }
681    }
682
683    #[test]
684    fn test_parquet_output_has_footer() {
685        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
686            schema_mode: ParquetSchemaMode::AutoInfer,
687            ..Default::default()
688        })
689        .expect("AutoInfer serializer should succeed");
690
691        let events = vec![create_event(vec![("msg", "test")])];
692        let mut buffer = BytesMut::new();
693        serializer.encode(events, &mut buffer).unwrap();
694
695        let data = buffer.freeze();
696        let len = data.len();
697        assert!(len >= 8, "Parquet output too short");
698        assert_eq!(
699            &data[len - 4..],
700            b"PAR1",
701            "Parquet footer magic bytes missing"
702        );
703    }
704
705    #[test]
706    fn test_writer_props_arc_shared() {
707        let serializer = ParquetSerializer::new(ParquetSerializerConfig {
708            schema_mode: ParquetSchemaMode::AutoInfer,
709            ..Default::default()
710        })
711        .expect("AutoInfer serializer should succeed");
712        let cloned = serializer.clone();
713
714        assert_eq!(Arc::strong_count(&serializer.writer_props), 2);
715        drop(cloned);
716        assert_eq!(Arc::strong_count(&serializer.writer_props), 1);
717    }
718
719    #[test]
720    fn test_mixed_log_and_non_log_events() {
721        use vector_core::event::{Metric, MetricKind, MetricValue};
722
723        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
724            schema_mode: ParquetSchemaMode::AutoInfer,
725            ..Default::default()
726        })
727        .expect("AutoInfer serializer should succeed");
728
729        let metric = Metric::new(
730            "cpu.usage",
731            MetricKind::Absolute,
732            MetricValue::Gauge { value: 42.0 },
733        );
734        let events = vec![
735            create_event(vec![("msg", "hello")]),
736            Event::Metric(metric),
737            create_event(vec![("msg", "world")]),
738        ];
739
740        let mut buffer = BytesMut::new();
741        serializer
742            .encode(events, &mut buffer)
743            .expect("Mixed batch should succeed (non-log events dropped)");
744
745        assert_parquet_magic(&buffer);
746        assert_eq!(parquet_row_count(&buffer), 2);
747    }
748
749    // ── Schema file mode ─────────────────────────────────────────────────────
750
751    #[test]
752    fn test_parquet_schema_file() {
753        let schema_path = write_temp_schema(
754            "schema_file",
755            "message logs {\n  required binary name (STRING);\n  optional int64 age;\n}",
756        );
757
758        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
759            "schema_file": schema_path.to_str().unwrap()
760        }))
761        .expect("Config should deserialize");
762
763        let mut serializer =
764            ParquetSerializer::new(config).expect("Should create serializer from schema file");
765
766        let mut log = LogEvent::default();
767        log.insert("name", "alice");
768
769        let mut buffer = BytesMut::new();
770        serializer
771            .encode(vec![Event::Log(log)], &mut buffer)
772            .expect("Encoding with schema file should succeed");
773
774        let data = buffer.freeze();
775        assert_parquet_magic(&data);
776        assert_eq!(parquet_row_count(&data), 1);
777
778        let columns = parquet_column_names(&data);
779        assert_eq!(columns, vec!["name", "age"]);
780    }
781
782    #[test]
783    fn test_parquet_schema_file_not_found_error() {
784        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
785            "schema_file": "/nonexistent/path/schema.parquet"
786        }))
787        .expect("Config should deserialize");
788
789        let result = ParquetSerializer::new(config);
790        assert!(result.is_err(), "Missing schema file should error");
791        assert!(
792            result.unwrap_err().to_string().contains("Failed to read"),
793            "Error should mention file read failure"
794        );
795    }
796
797    #[test]
798    fn test_parquet_schema_file_invalid_syntax_error() {
799        let schema_path = write_temp_schema(
800            "invalid_syntax",
801            "this is not valid parquet schema syntax !!!",
802        );
803
804        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
805            "schema_file": schema_path.to_str().unwrap()
806        }))
807        .expect("Config should deserialize");
808
809        let result = ParquetSerializer::new(config);
810        assert!(result.is_err(), "Invalid Parquet schema should error");
811        assert!(
812            result
813                .unwrap_err()
814                .to_string()
815                .contains("Failed to parse Parquet schema"),
816            "Error should mention parsing failure"
817        );
818    }
819
820    #[test]
821    fn test_parquet_no_schema_error() {
822        let config = ParquetSerializerConfig::default();
823        let result = ParquetSerializer::new(config);
824        assert!(
825            result.is_err(),
826            "Should fail without schema_file or auto_infer"
827        );
828    }
829
830    // ── Schema mode: strict / relaxed ────────────────────────────────────────
831
832    #[test]
833    fn test_parquet_strict_mode_rejects_extra_fields() {
834        let schema_path = write_temp_schema(
835            "strict_rejects",
836            "message logs {\n  required binary name (STRING);\n}",
837        );
838
839        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
840            schema_file: Some(schema_path),
841            schema_mode: ParquetSchemaMode::Strict,
842            ..Default::default()
843        })
844        .expect("Failed to create strict serializer");
845
846        let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
847        let mut buffer = BytesMut::new();
848        let result = serializer.encode(events, &mut buffer);
849        assert!(result.is_err(), "Strict mode should reject extra fields");
850        assert!(result.unwrap_err().to_string().contains("city"));
851    }
852
853    #[test]
854    fn test_parquet_strict_mode_allows_schema_fields() {
855        let schema_path = write_temp_schema(
856            "strict_allows",
857            "message logs {\n  required binary name (STRING);\n  required binary level (STRING);\n}",
858        );
859
860        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
861            schema_file: Some(schema_path),
862            schema_mode: ParquetSchemaMode::Strict,
863            ..Default::default()
864        })
865        .expect("Failed to create strict serializer");
866
867        let mut log = LogEvent::default();
868        log.insert("name", "test");
869        log.insert("level", "info");
870
871        let mut buffer = BytesMut::new();
872        assert!(
873            serializer
874                .encode(vec![Event::Log(log)], &mut buffer)
875                .is_ok(),
876            "Strict mode should pass when all fields match schema"
877        );
878    }
879
880    #[test]
881    fn test_parquet_relaxed_mode_drops_extra_fields() {
882        let schema_path = write_temp_schema(
883            "relaxed_drops",
884            "message logs {\n  required binary name (STRING);\n}",
885        );
886
887        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
888            schema_file: Some(schema_path),
889            schema_mode: ParquetSchemaMode::Relaxed,
890            ..Default::default()
891        })
892        .expect("Failed to create relaxed serializer");
893
894        let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
895        let mut buffer = BytesMut::new();
896        serializer
897            .encode(events, &mut buffer)
898            .expect("Relaxed mode should drop extra fields silently");
899
900        let data = buffer.freeze();
901        assert_parquet_magic(&data);
902        assert_eq!(parquet_row_count(&data), 1);
903        let columns = parquet_column_names(&data);
904        assert_eq!(columns, vec!["name"]);
905    }
906
907    #[test]
908    fn test_parquet_type_mismatch_returns_error() {
909        let schema_path =
910            write_temp_schema("type_mismatch", "message logs {\n  required int64 name;\n}");
911
912        let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
913            schema_file: Some(schema_path),
914            schema_mode: ParquetSchemaMode::Relaxed,
915            ..Default::default()
916        })
917        .expect("Failed to create serializer");
918
919        let events = vec![create_event(vec![("name", "not_an_integer")])];
920        let mut buffer = BytesMut::new();
921        let result = serializer.encode(events, &mut buffer);
922        assert!(result.is_err(), "Type mismatch should return an error");
923        let err = result.unwrap_err().to_string();
924        assert!(
925            err.contains("Int64"),
926            "Error should mention the expected type, got: {err}"
927        );
928    }
929
930    #[test]
931    fn test_parquet_schema_file_binary_without_string_annotation_rejected() {
932        // Native Parquet "binary" without (STRING) annotation resolves to Arrow Binary,
933        // which is rejected at config time.
934        let schema_path = write_temp_schema(
935            "binary_rejected",
936            "message logs {\n  required binary name (STRING);\n  optional binary raw_data;\n}",
937        );
938
939        let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
940            "schema_file": schema_path.to_str().unwrap()
941        }))
942        .expect("Config should deserialize");
943
944        let result = ParquetSerializer::new(config);
945        assert!(
946            result.is_err(),
947            "Parquet binary without STRING annotation should be rejected"
948        );
949        assert!(
950            result.unwrap_err().to_string().contains("raw_data"),
951            "Error should name the offending field"
952        );
953    }
954}