1#![allow(clippy::let_underscore_must_use)]
3
4use std::collections::HashSet;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
15use arrow::error::ArrowError;
16use arrow::json::reader::infer_json_schema_from_iterator;
17use arrow::record_batch::RecordBatch;
18use bytes::{BufMut, BytesMut};
19use derivative::Derivative;
20use parquet::arrow::ArrowWriter;
21use parquet::basic::ZstdLevel;
22use parquet::basic::{Compression as ParquetCodecCompression, GzipLevel};
23use parquet::file::properties::WriterProperties;
24use std::io::{Error, ErrorKind};
25use tracing::warn;
26use vector_common::internal_event::{
27 ComponentEventsDropped, Count, InternalEventHandle, Registered, UNINTENTIONAL, emit, register,
28};
29use vector_config::configurable_component;
30use vector_core::event::Event;
31
32use super::arrow::{ArrowEncodingError, build_record_batch};
33use crate::encoding::format::arrow::vector_log_events_to_json_values;
34use crate::internal_events::{ArrowWriterError, JsonSerializationError, SchemaGenerationError};
35
36type EventsDroppedError = ComponentEventsDropped<'static, UNINTENTIONAL>;
37
38#[configurable_component]
40#[derive(Default, Copy, Clone, Debug, PartialEq)]
41#[configurable(metadata(
42 docs::enum_tag_description = "Compression codec applied per column page inside the Parquet file."
43))]
44#[serde(tag = "algorithm", rename_all = "snake_case")]
45pub enum ParquetCompression {
46 Zstd {
48 #[configurable(validation(range(min = 1, max = 21)))]
50 level: u8,
51 },
52 Gzip {
54 #[configurable(validation(range(min = 1, max = 9)))]
56 level: u8,
57 },
58
59 #[default]
61 Snappy,
62
63 Lz4,
65
66 None,
68}
69
70impl TryFrom<ParquetCompression> for ParquetCodecCompression {
71 type Error = parquet::errors::ParquetError;
72 fn try_from(
73 value: ParquetCompression,
74 ) -> Result<ParquetCodecCompression, parquet::errors::ParquetError> {
75 match value {
76 ParquetCompression::None => Ok(ParquetCodecCompression::UNCOMPRESSED),
77 ParquetCompression::Snappy => Ok(ParquetCodecCompression::SNAPPY),
78 ParquetCompression::Zstd { level } => Ok(ParquetCodecCompression::ZSTD(
79 ZstdLevel::try_new(level.into())?,
80 )),
81 ParquetCompression::Gzip { level } => Ok(ParquetCodecCompression::GZIP(
82 GzipLevel::try_new(level.into())?,
83 )),
84 ParquetCompression::Lz4 => Ok(ParquetCodecCompression::LZ4_RAW),
85 }
86 }
87}
88
89#[configurable_component]
91#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
92#[serde(rename_all = "snake_case")]
93pub enum ParquetSchemaMode {
94 #[default]
96 Relaxed,
97 Strict,
99 AutoInfer,
101}
102
103#[configurable_component]
110#[derive(Clone, Debug, Default)]
111pub struct ParquetSerializerConfig {
112 #[serde(default)]
117 pub schema_file: Option<PathBuf>,
118
119 #[serde(default)]
121 #[configurable(derived)]
122 pub compression: ParquetCompression,
123
124 #[serde(default)]
126 #[configurable(derived)]
127 pub schema_mode: ParquetSchemaMode,
128}
129
130impl ParquetSerializerConfig {
131 fn resolve_schema(&self) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
133 if self.schema_mode == ParquetSchemaMode::AutoInfer {
134 return Ok(Schema::empty());
135 }
136
137 let path = self
138 .schema_file
139 .as_ref()
140 .ok_or("schema_file is required unless schema_mode is auto_infer")?;
141
142 let content = read_schema_file(path, "schema_file")?;
143 let parquet_type = parquet::schema::parser::parse_message_type(&content)
144 .map_err(|e| format!("Failed to parse Parquet schema: {e}"))?;
145 let schema_desc = parquet::schema::types::SchemaDescriptor::new(Arc::new(parquet_type));
146 let arrow_schema = parquet::arrow::parquet_to_arrow_schema(&schema_desc, None)
147 .map_err(|e| format!("Failed to convert Parquet schema to Arrow: {e}"))?;
148 Ok(arrow_schema)
149 }
150
151 pub fn input_type(&self) -> vector_core::config::DataType {
153 vector_core::config::DataType::Log
154 }
155
156 pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
158 vector_core::schema::Requirement::empty()
159 }
160}
161
162fn read_schema_file(
163 path: &std::path::Path,
164 field_name: &str,
165) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
166 const MAX_SCHEMA_FILE_SIZE: u64 = 10 * 1024 * 1024; let display = path.display();
168 let metadata = std::fs::metadata(path)
169 .map_err(|e| format!("Failed to read {field_name} '{display}': {e}"))?;
170 if metadata.len() > MAX_SCHEMA_FILE_SIZE {
171 return Err(format!(
172 "{field_name} '{display}' is too large ({} bytes, max {MAX_SCHEMA_FILE_SIZE})",
173 metadata.len()
174 )
175 .into());
176 }
177 std::fs::read_to_string(path)
178 .map_err(|e| format!("Failed to read {field_name} '{display}': {e}").into())
179}
180
181fn reject_unsupported_arrow_types(
186 schema: &Schema,
187) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
188 fn check_field(field: &Field, path: &str, bad: &mut Vec<String>) {
189 let name = if path.is_empty() {
190 field.name().to_string()
191 } else {
192 format!("{path}.{}", field.name())
193 };
194 match field.data_type() {
195 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
196 bad.push(format!("'{name}' ({:?})", field.data_type()));
197 }
198 DataType::Struct(fields) => {
199 for f in fields {
200 check_field(f, &name, bad);
201 }
202 }
203 DataType::List(inner) | DataType::LargeList(inner) => {
204 check_field(inner, &name, bad);
205 }
206 DataType::Map(entries_field, _) => {
207 if let DataType::Struct(kv) = entries_field.data_type() {
208 for f in kv {
209 check_field(f, &name, bad);
210 }
211 }
212 }
213 _ => {}
214 }
215 }
216
217 let mut bad = Vec::new();
218 for field in schema.fields() {
219 check_field(field, "", &mut bad);
220 }
221 if !bad.is_empty() {
222 return Err(format!(
223 "Schema contains binary field(s) unsupported by the JSON-based Arrow encoder: {}. \
224 Use Utf8 for base64/hex-encoded data instead.",
225 bad.join(", ")
226 )
227 .into());
228 }
229 Ok(())
230}
231
232#[derive(Derivative)]
234#[derivative(Debug, Clone)]
235pub struct ParquetSerializer {
236 schema: SchemaRef,
237 writer_props: Arc<WriterProperties>,
238 schema_mode: ParquetSchemaMode,
239 schema_field_names: HashSet<String>,
241
242 #[derivative(Debug = "ignore")]
243 events_dropped_handle: Registered<EventsDroppedError>,
244}
245
246impl ParquetSerializer {
247 pub fn new(
249 config: ParquetSerializerConfig,
250 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
251 let schema = config.resolve_schema()?;
252 reject_unsupported_arrow_types(&schema)?;
253 let schema_ref = SchemaRef::new(schema);
254
255 let schema_field_names = schema_ref
256 .fields()
257 .iter()
258 .map(|f| f.name().clone())
259 .collect::<HashSet<_>>();
260
261 let writer_props = Arc::new(
262 WriterProperties::builder()
263 .set_compression(config.compression.try_into()?)
264 .build(),
265 );
266
267 Ok(Self {
268 schema: schema_ref,
269 writer_props,
270 schema_mode: config.schema_mode,
271 schema_field_names,
272 events_dropped_handle: register(EventsDroppedError::from(
273 "Events could not be serialized to parquet",
274 )),
275 })
276 }
277
278 pub const fn content_type(&self) -> &'static str {
280 "application/vnd.apache.parquet"
281 }
282
283 fn write_record_batch(
289 record_batch: &RecordBatch,
290 buffer: &mut BytesMut,
291 writer_props: &WriterProperties,
292 ) -> Result<(), parquet::errors::ParquetError> {
293 let mut writer = ArrowWriter::try_new(
294 buffer.writer(),
295 Arc::clone(record_batch.schema_ref()),
296 Some(writer_props.clone()),
297 )
298 .inspect_err(|e| {
299 emit(ArrowWriterError { error: e });
300 })?;
301
302 writer.write(record_batch).inspect_err(|e| {
303 emit(ArrowWriterError { error: e });
304 })?;
305
306 writer.close().inspect_err(|e| {
307 emit(ArrowWriterError { error: e });
308 })?;
309
310 Ok(())
311 }
312}
313
314impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
315 type Error = vector_common::Error;
316
317 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
318 if events.is_empty() {
319 return Ok(());
320 }
321
322 let json_values = match vector_log_events_to_json_values(&events) {
323 Ok(values) => values,
324 Err(e) => {
325 emit(JsonSerializationError { error: &e });
326 return Err(Box::new(e));
327 }
328 };
329
330 let non_log_count = events.len() - json_values.len();
331
332 if non_log_count > 0 {
333 warn!(
334 message = "Non-log events dropped by Parquet encoder ",
335 %non_log_count,
336 internal_log_rate_secs = 10,
337 );
338 self.events_dropped_handle.emit(Count(non_log_count))
339 }
340
341 if json_values.is_empty() {
342 return Ok(());
343 }
344
345 match self.schema_mode {
346 ParquetSchemaMode::Strict => {
348 for event in &events {
349 if let Some(log) = event.maybe_as_log()
350 && let Some(object_map) = log.as_map()
351 {
352 for top_level in object_map.keys() {
353 if !self.schema_field_names.contains(top_level.as_str()) {
354 return Err(Box::new(ArrowEncodingError::SchemaFetchError {
355 message: format!(
356 "Strict schema mode: event contains field '{top_level}' not in schema",
357 ),
358 }));
359 }
360 }
361 }
362 }
363 }
364 ParquetSchemaMode::AutoInfer => {
365 let schema = ParquetSchemaGenerator::infer_schema(&json_values)?;
366 self.schema = Arc::new(ParquetSchemaGenerator::try_normalize_schema(
367 &events, schema,
368 ));
369 }
370 ParquetSchemaMode::Relaxed => {}
371 }
372
373 let record_batch =
374 build_record_batch(Arc::clone(&self.schema), &json_values).map_err(Box::new)?;
375
376 Self::write_record_batch(&record_batch, buffer, &self.writer_props).map_err(Box::new)?;
377
378 Ok(())
379 }
380}
381
382pub struct ParquetSchemaGenerator {}
383
384impl ParquetSchemaGenerator {
385 pub fn infer_schema(events: &[serde_json::Value]) -> Result<Schema, Error> {
386 let schema = infer_json_schema_from_iterator(events.iter().map(Ok::<_, ArrowError>))
387 .map_err(|e| {
388 emit(SchemaGenerationError { error: &e });
389 Error::new(ErrorKind::InvalidData, e.to_string())
390 })?;
391
392 Ok(schema)
393 }
394
395 fn try_normalize_schema(events: &[Event], schema: Schema) -> Schema {
398 let mut ts_seen: HashSet<String> = HashSet::new();
399 let mut non_ts_seen: HashSet<String> = HashSet::new();
400
401 for event in events.iter().filter_map(Event::maybe_as_log) {
402 if let Some(object_map) = event.as_map() {
403 for (path, value) in object_map {
404 if value.is_timestamp() {
405 ts_seen.insert(path.to_string());
406 } else if !value.is_null() {
407 non_ts_seen.insert(path.to_string());
408 }
409 }
410 }
411 }
412
413 let new_fields: Vec<Field> = schema
414 .fields()
415 .iter()
416 .map(|f| {
417 if ts_seen.contains(f.name()) && !non_ts_seen.contains(f.name()) {
418 Field::new(
419 f.name(),
420 DataType::Timestamp(
421 arrow::datatypes::TimeUnit::Microsecond,
422 Some("UTC".into()),
423 ),
424 f.is_nullable(),
425 )
426 } else {
427 f.as_ref().clone()
428 }
429 })
430 .collect();
431
432 Schema::new_with_metadata(new_fields, schema.metadata().clone())
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use bytes::Bytes;
440 use parquet::file::reader::{FileReader, SerializedFileReader};
441 use parquet::record::reader::RowIter;
442 use tokio_util::codec::Encoder;
443 use vector_core::event::LogEvent;
444
445 fn create_event<V>(fields: Vec<(&str, V)>) -> Event
446 where
447 V: Into<vector_core::event::Value>,
448 {
449 let mut log = LogEvent::default();
450 for (key, value) in fields {
451 log.insert(key, value.into());
452 }
453 Event::Log(log)
454 }
455
456 fn assert_parquet_magic(data: &[u8]) {
457 assert!(data.len() >= 4, "Output too short to be valid Parquet");
458 assert_eq!(&data[..4], b"PAR1", "Missing Parquet magic bytes");
459 }
460
461 fn parquet_row_count(data: &[u8]) -> usize {
462 let reader =
463 SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
464 let iter = RowIter::from_file_into(Box::new(reader));
465 iter.count()
466 }
467
468 fn parquet_column_names(data: &[u8]) -> Vec<String> {
469 let reader =
470 SerializedFileReader::new(Bytes::copy_from_slice(data)).expect("Invalid Parquet file");
471 let schema = reader.metadata().file_metadata().schema_descr();
472 schema
473 .columns()
474 .iter()
475 .map(|c| c.name().to_string())
476 .collect()
477 }
478
479 fn parse_timestamp(s: &str) -> chrono::DateTime<chrono::Utc> {
480 chrono::DateTime::parse_from_rfc3339(s)
481 .expect("invalid test timestamp")
482 .with_timezone(&chrono::Utc)
483 }
484
485 fn demo_log_event(
486 message: &str,
487 timestamp: chrono::DateTime<chrono::Utc>,
488 status_code: i64,
489 response_time_secs: f64,
490 ) -> Event {
491 use vector_core::event::Value;
492 let mut log = LogEvent::default();
493 log.insert("host", "localhost");
494 log.insert("message", message);
495 log.insert("service", "vector");
496 log.insert("source_type", "demo_logs");
497 log.insert("timestamp", Value::Timestamp(timestamp));
498 log.insert("random_time", Value::Timestamp(timestamp));
499 log.insert("status_code", Value::Integer(status_code));
500 log.insert("response_time_secs", response_time_secs);
501 Event::Log(log)
502 }
503
504 fn sample_events() -> Vec<Event> {
505 const EVENTS: [(&str, &str, i64, f64); 5] = [
506 (
507 "GET /api/v1/health HTTP/1.1",
508 "2026-03-05T20:49:08.037194Z",
509 200,
510 0.037,
511 ),
512 (
513 "POST /api/v1/ingest HTTP/1.1",
514 "2026-03-05T20:49:09.038051Z",
515 201,
516 0.013,
517 ),
518 (
519 "GET /metrics HTTP/1.1",
520 "2026-03-05T20:49:10.036612Z",
521 200,
522 0.022,
523 ),
524 (
525 "DELETE /api/v1/resource HTTP/1.1",
526 "2026-03-05T20:49:11.537131Z",
527 404,
528 0.005,
529 ),
530 (
531 "PATCH /api/v1/config HTTP/1.1",
532 "2026-03-05T20:49:12.037491Z",
533 500,
534 0.091,
535 ),
536 ];
537 EVENTS
538 .iter()
539 .map(|(msg, ts, status, rt)| demo_log_event(msg, parse_timestamp(ts), *status, *rt))
540 .collect()
541 }
542
543 fn encode_autoinfer_and_read_schema(
544 events: Vec<Event>,
545 ) -> (arrow::datatypes::SchemaRef, usize) {
546 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
547
548 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
549 schema_mode: ParquetSchemaMode::AutoInfer,
550 ..Default::default()
551 })
552 .expect("AutoInfer serializer should be created without a static schema");
553
554 let mut buffer = BytesMut::new();
555 serializer
556 .encode(events, &mut buffer)
557 .expect("encoding should succeed");
558
559 let data = buffer.freeze();
560 assert_parquet_magic(&data);
561
562 let builder = ParquetRecordBatchReaderBuilder::try_new(data)
563 .expect("should build ParquetRecordBatchReaderBuilder");
564 let schema = builder.schema().clone();
565 let num_rows: usize = builder
566 .build()
567 .expect("should build reader")
568 .map(|b| b.expect("batch read error").num_rows())
569 .sum();
570 (schema, num_rows)
571 }
572
573 fn write_temp_schema(name: &str, content: &str) -> std::path::PathBuf {
577 use std::io::Write;
578 let path = std::env::temp_dir().join(format!(
579 "vector_parquet_test_{}_{}.schema",
580 std::process::id(),
581 name,
582 ));
583 let mut f = std::fs::File::create(&path).expect("Failed to create schema file");
584 write!(f, "{content}").expect("Failed to write schema");
585 path
586 }
587
588 #[test]
591 fn encode_input_produces_parquet_output() {
592 let events = sample_events();
593 let n_events = events.len();
594 let (schema, num_rows) = encode_autoinfer_and_read_schema(events);
595
596 assert_eq!(num_rows, n_events, "row count should match event count");
597
598 for field_name in &["timestamp", "random_time"] {
599 let field = schema
600 .field_with_name(field_name)
601 .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
602 assert!(
603 matches!(
604 field.data_type(),
605 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC"
606 ),
607 "'{field_name}' should be Timestamp(Microsecond, UTC), got {:?}",
608 field.data_type()
609 );
610 }
611
612 let status_field = schema
613 .field_with_name("status_code")
614 .expect("status_code field should exist");
615 assert_eq!(status_field.data_type(), &DataType::Int64);
616
617 let rt_field = schema
618 .field_with_name("response_time_secs")
619 .expect("response_time_secs field should exist");
620 assert_eq!(rt_field.data_type(), &DataType::Float64);
621
622 for field_name in &["host", "message", "service", "source_type"] {
623 let field = schema
624 .field_with_name(field_name)
625 .unwrap_or_else(|_| panic!("field '{field_name}' should exist in schema"));
626 assert_eq!(field.data_type(), &DataType::Utf8);
627 }
628 }
629
630 #[test]
631 fn test_parquet_empty_events() {
632 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
633 schema_mode: ParquetSchemaMode::AutoInfer,
634 ..Default::default()
635 })
636 .expect("AutoInfer serializer should succeed");
637
638 let events: Vec<Event> = vec![];
639 let mut buffer = BytesMut::new();
640 serializer
641 .encode(events, &mut buffer)
642 .expect("Empty events should succeed");
643
644 assert!(buffer.is_empty(), "Buffer should be empty for empty events");
645 }
646
647 #[test]
648 fn test_parquet_compression_variants() {
649 let events = vec![create_event(vec![("msg", "hello world")])];
650
651 let compressions = vec![
652 ParquetCompression::None,
653 ParquetCompression::Snappy,
654 ParquetCompression::Zstd { level: 1 },
655 ParquetCompression::Gzip { level: 1 },
656 ParquetCompression::Lz4,
657 ];
658
659 for compression in compressions {
660 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
661 schema_mode: ParquetSchemaMode::AutoInfer,
662 compression,
663 ..Default::default()
664 })
665 .expect("Failed to create serializer");
666
667 let mut buffer = BytesMut::new();
668 serializer
669 .encode(events.clone(), &mut buffer)
670 .unwrap_or_else(|e| panic!("Encoding with {:?} failed: {}", compression, e));
671
672 let data = buffer.freeze();
673 assert_parquet_magic(&data);
674 assert_eq!(
675 parquet_row_count(&data),
676 1,
677 "Wrong row count for {:?}",
678 compression
679 );
680 }
681 }
682
683 #[test]
684 fn test_parquet_output_has_footer() {
685 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
686 schema_mode: ParquetSchemaMode::AutoInfer,
687 ..Default::default()
688 })
689 .expect("AutoInfer serializer should succeed");
690
691 let events = vec![create_event(vec![("msg", "test")])];
692 let mut buffer = BytesMut::new();
693 serializer.encode(events, &mut buffer).unwrap();
694
695 let data = buffer.freeze();
696 let len = data.len();
697 assert!(len >= 8, "Parquet output too short");
698 assert_eq!(
699 &data[len - 4..],
700 b"PAR1",
701 "Parquet footer magic bytes missing"
702 );
703 }
704
705 #[test]
706 fn test_writer_props_arc_shared() {
707 let serializer = ParquetSerializer::new(ParquetSerializerConfig {
708 schema_mode: ParquetSchemaMode::AutoInfer,
709 ..Default::default()
710 })
711 .expect("AutoInfer serializer should succeed");
712 let cloned = serializer.clone();
713
714 assert_eq!(Arc::strong_count(&serializer.writer_props), 2);
715 drop(cloned);
716 assert_eq!(Arc::strong_count(&serializer.writer_props), 1);
717 }
718
719 #[test]
720 fn test_mixed_log_and_non_log_events() {
721 use vector_core::event::{Metric, MetricKind, MetricValue};
722
723 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
724 schema_mode: ParquetSchemaMode::AutoInfer,
725 ..Default::default()
726 })
727 .expect("AutoInfer serializer should succeed");
728
729 let metric = Metric::new(
730 "cpu.usage",
731 MetricKind::Absolute,
732 MetricValue::Gauge { value: 42.0 },
733 );
734 let events = vec![
735 create_event(vec![("msg", "hello")]),
736 Event::Metric(metric),
737 create_event(vec![("msg", "world")]),
738 ];
739
740 let mut buffer = BytesMut::new();
741 serializer
742 .encode(events, &mut buffer)
743 .expect("Mixed batch should succeed (non-log events dropped)");
744
745 assert_parquet_magic(&buffer);
746 assert_eq!(parquet_row_count(&buffer), 2);
747 }
748
749 #[test]
752 fn test_parquet_schema_file() {
753 let schema_path = write_temp_schema(
754 "schema_file",
755 "message logs {\n required binary name (STRING);\n optional int64 age;\n}",
756 );
757
758 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
759 "schema_file": schema_path.to_str().unwrap()
760 }))
761 .expect("Config should deserialize");
762
763 let mut serializer =
764 ParquetSerializer::new(config).expect("Should create serializer from schema file");
765
766 let mut log = LogEvent::default();
767 log.insert("name", "alice");
768
769 let mut buffer = BytesMut::new();
770 serializer
771 .encode(vec![Event::Log(log)], &mut buffer)
772 .expect("Encoding with schema file should succeed");
773
774 let data = buffer.freeze();
775 assert_parquet_magic(&data);
776 assert_eq!(parquet_row_count(&data), 1);
777
778 let columns = parquet_column_names(&data);
779 assert_eq!(columns, vec!["name", "age"]);
780 }
781
782 #[test]
783 fn test_parquet_schema_file_not_found_error() {
784 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
785 "schema_file": "/nonexistent/path/schema.parquet"
786 }))
787 .expect("Config should deserialize");
788
789 let result = ParquetSerializer::new(config);
790 assert!(result.is_err(), "Missing schema file should error");
791 assert!(
792 result.unwrap_err().to_string().contains("Failed to read"),
793 "Error should mention file read failure"
794 );
795 }
796
797 #[test]
798 fn test_parquet_schema_file_invalid_syntax_error() {
799 let schema_path = write_temp_schema(
800 "invalid_syntax",
801 "this is not valid parquet schema syntax !!!",
802 );
803
804 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
805 "schema_file": schema_path.to_str().unwrap()
806 }))
807 .expect("Config should deserialize");
808
809 let result = ParquetSerializer::new(config);
810 assert!(result.is_err(), "Invalid Parquet schema should error");
811 assert!(
812 result
813 .unwrap_err()
814 .to_string()
815 .contains("Failed to parse Parquet schema"),
816 "Error should mention parsing failure"
817 );
818 }
819
820 #[test]
821 fn test_parquet_no_schema_error() {
822 let config = ParquetSerializerConfig::default();
823 let result = ParquetSerializer::new(config);
824 assert!(
825 result.is_err(),
826 "Should fail without schema_file or auto_infer"
827 );
828 }
829
830 #[test]
833 fn test_parquet_strict_mode_rejects_extra_fields() {
834 let schema_path = write_temp_schema(
835 "strict_rejects",
836 "message logs {\n required binary name (STRING);\n}",
837 );
838
839 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
840 schema_file: Some(schema_path),
841 schema_mode: ParquetSchemaMode::Strict,
842 ..Default::default()
843 })
844 .expect("Failed to create strict serializer");
845
846 let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
847 let mut buffer = BytesMut::new();
848 let result = serializer.encode(events, &mut buffer);
849 assert!(result.is_err(), "Strict mode should reject extra fields");
850 assert!(result.unwrap_err().to_string().contains("city"));
851 }
852
853 #[test]
854 fn test_parquet_strict_mode_allows_schema_fields() {
855 let schema_path = write_temp_schema(
856 "strict_allows",
857 "message logs {\n required binary name (STRING);\n required binary level (STRING);\n}",
858 );
859
860 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
861 schema_file: Some(schema_path),
862 schema_mode: ParquetSchemaMode::Strict,
863 ..Default::default()
864 })
865 .expect("Failed to create strict serializer");
866
867 let mut log = LogEvent::default();
868 log.insert("name", "test");
869 log.insert("level", "info");
870
871 let mut buffer = BytesMut::new();
872 assert!(
873 serializer
874 .encode(vec![Event::Log(log)], &mut buffer)
875 .is_ok(),
876 "Strict mode should pass when all fields match schema"
877 );
878 }
879
880 #[test]
881 fn test_parquet_relaxed_mode_drops_extra_fields() {
882 let schema_path = write_temp_schema(
883 "relaxed_drops",
884 "message logs {\n required binary name (STRING);\n}",
885 );
886
887 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
888 schema_file: Some(schema_path),
889 schema_mode: ParquetSchemaMode::Relaxed,
890 ..Default::default()
891 })
892 .expect("Failed to create relaxed serializer");
893
894 let events = vec![create_event(vec![("name", "alice"), ("city", "paris")])];
895 let mut buffer = BytesMut::new();
896 serializer
897 .encode(events, &mut buffer)
898 .expect("Relaxed mode should drop extra fields silently");
899
900 let data = buffer.freeze();
901 assert_parquet_magic(&data);
902 assert_eq!(parquet_row_count(&data), 1);
903 let columns = parquet_column_names(&data);
904 assert_eq!(columns, vec!["name"]);
905 }
906
907 #[test]
908 fn test_parquet_type_mismatch_returns_error() {
909 let schema_path =
910 write_temp_schema("type_mismatch", "message logs {\n required int64 name;\n}");
911
912 let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
913 schema_file: Some(schema_path),
914 schema_mode: ParquetSchemaMode::Relaxed,
915 ..Default::default()
916 })
917 .expect("Failed to create serializer");
918
919 let events = vec![create_event(vec![("name", "not_an_integer")])];
920 let mut buffer = BytesMut::new();
921 let result = serializer.encode(events, &mut buffer);
922 assert!(result.is_err(), "Type mismatch should return an error");
923 let err = result.unwrap_err().to_string();
924 assert!(
925 err.contains("Int64"),
926 "Error should mention the expected type, got: {err}"
927 );
928 }
929
930 #[test]
931 fn test_parquet_schema_file_binary_without_string_annotation_rejected() {
932 let schema_path = write_temp_schema(
935 "binary_rejected",
936 "message logs {\n required binary name (STRING);\n optional binary raw_data;\n}",
937 );
938
939 let config: ParquetSerializerConfig = serde_json::from_value(serde_json::json!({
940 "schema_file": schema_path.to_str().unwrap()
941 }))
942 .expect("Config should deserialize");
943
944 let result = ParquetSerializer::new(config);
945 assert!(
946 result.is_err(),
947 "Parquet binary without STRING annotation should be rejected"
948 );
949 assert!(
950 result.unwrap_err().to_string().contains("raw_data"),
951 "Error should name the offending field"
952 );
953 }
954}